| #!/usr/bin/env python3 |
| # Copyright 2023 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """CR and CQ +2 copybot project commits for downstreaming. |
| |
| See go/copybot |
| |
| For Zephyr Downstreaming Rotation: go/zephyr-downstreaming-guide |
| For coreboot Downstreaming Rotation: go/coreboot:downstreaming |
| """ |
| |
| from collections import defaultdict |
| import logging |
| import re |
| from typing import Callable, Dict, List, NamedTuple, Optional, Tuple |
| |
| from chromite.contrib.copybot_downstream_config import downstream_argparser |
| from chromite.lib import config_lib |
| from chromite.lib import gerrit |
| |
| |
| # Gerrit will merge a max of 240 dependencies. Leave some room |
| # for dependencies from the platform/ec repo. |
| MAX_GERRIT_CHANGES = 225 |
| REVIEWER_KEY_TEXT = "Original-Reviewed-by" |
| |
| |
| class PathDomains(NamedTuple): |
| """A filepath and the domains that must review it.""" |
| |
| path: str |
| domains: List[str] |
| |
| |
| class CopybotDownstream: |
| """Class for defining the functionality of the downstreaming review process.""" |
| |
| def __init__( |
| self, |
| project: str, |
| dry_run: bool = False, |
| cq_dry_run: bool = False, |
| limit: Optional[int] = None, |
| stop_at: Optional[str] = None, |
| ignore_warnings: bool = False, |
| ): |
| """Initialize the CopybotDownstream Object. |
| |
| Args: |
| project: Name of the project to be acted on. |
| dry_run: If True dry-run this pass without acting on gerrit |
| cq_dry_run: If True, use CQ+1 instead of CQ+2 |
| limit: Limit the number of CL's to be downstreamed |
| stop_at: Stop at the specified change(CL Number) |
| ignore_warnings: Ignore warnings and submit changes |
| """ |
| self.gerrit_helper = gerrit.GetGerritHelper( |
| config_lib.GetSiteParams().EXTERNAL_REMOTE |
| ) |
| # Map of functions to be called when the project in the key is encountered. |
| # |
| # List of tuples(function, list of arguments) where the format of the list |
| # can vary across functions. |
| # |
| # Functions should take a CL and perform any additional checks required by the project |
| # prior to downstreaming. |
| # |
| # Args: |
| # gerrit CL dict for use in parsing |
| # dynamic args for use in parsing |
| # Returns: |
| # List of warning strings to be printed for this CL |
| self.project = project |
| self.dry_run = dry_run |
| self.cq_dry_run = cq_dry_run |
| self.stop_at = stop_at |
| self.ignore_warnings = ignore_warnings |
| if not limit or limit > MAX_GERRIT_CHANGES: |
| logging.info( |
| "Limiting to maximum Gerrit changes (%d)", MAX_GERRIT_CHANGES |
| ) |
| limit = MAX_GERRIT_CHANGES |
| self.limit = limit |
| default_check_funcs = [ |
| [self.check_commit_message, ["\nC[Qq]-Depend:.*"]], |
| [self.check_hashtags, ["copybot-skip"]], |
| ] |
| self.check_funcs = default_check_funcs + self._project_check_funcs() |
| |
| def _project_check_funcs(self) -> List[List[Tuple[Callable, List]]]: |
| """Return a list of checkers specific to this project config. |
| |
| Since this is the general-case class, there are no project-specific |
| checkers. Subclasses should override this function to define their |
| specific needs. |
| """ |
| return [] |
| |
| @staticmethod |
| def project_passed_downstreamer_review_paths_check( |
| cl: Dict, paths: List[str] |
| ) -> List[str]: |
| """Check paths that require further downstreamer review. |
| |
| * Require additional review from any paths specified by paths. |
| |
| Args: |
| cl: gerrit CL dict for use in parsing. |
| paths: paths to check for this project. |
| |
| Returns: |
| warning_strings a list of strings to be printed for this CL. |
| * This must match the expected prototype used in check_funcs. |
| """ |
| warning_strings = [] |
| revision = cl["revisions"][cl["current_revision"]] |
| for path in paths: |
| if any(path in s for s in revision["files"]): |
| warning_strings.append( |
| f"Found filepath({path}) which requires downstreamer review" |
| ) |
| return warning_strings |
| |
| @staticmethod |
| def project_passed_domain_restricted_paths_check( |
| cl: Dict, paths_domains: List[PathDomains] |
| ) -> List[str]: |
| """Check paths that require further downstreamer review. |
| |
| * Require additional review if changes to paths specified by paths |
| are not reviewed by anyone in the domain list specified by paths. |
| |
| Args: |
| cl: gerrit CL dict for use in parsing. |
| paths_domains: list of tuples(path, restricted_domains), where path is the path to |
| be restricted, and restricted_domains is a list of domains that should |
| have been a part of the review. |
| |
| Returns: |
| warning_strings a list of strings to be printed for this CL. |
| * This must match the expected prototype used in check_funcs. |
| """ |
| warning_strings = [] |
| revision = cl["revisions"][cl["current_revision"]] |
| reviewers = [] |
| for line in revision["commit"]["message"].splitlines(): |
| if line.startswith(REVIEWER_KEY_TEXT): |
| reviewers.append(line[(len(REVIEWER_KEY_TEXT) + 1) :]) |
| for path, domains in paths_domains: |
| if any(path in s for s in revision["files"]): |
| reviewer_found = False |
| for domain in domains: |
| if any(domain in s for s in reviewers): |
| reviewer_found = True |
| if not reviewer_found: |
| warning_strings.append( |
| ( |
| f"Found filepath({path}) which requires downstreamer review from" |
| f"domain(s) {str(domains)}" |
| ) |
| ) |
| return warning_strings |
| |
| @staticmethod |
| def check_commit_message(cl: Dict, args: List[str]) -> List[str]: |
| """Check commit message for keywords. |
| |
| * Throw warning if keywords found in commit message. |
| This can be useful for banned words as well as logistical issues |
| such as CL's with CQ-Depend. |
| |
| Args: |
| cl: gerrit CL dict for use in parsing. |
| args: list of keywords to flag. |
| |
| Returns: |
| warning_strings a list of strings to be printed for this CL. |
| * This must match the expected prototype used in check_funcs. |
| """ |
| warning_strings = [] |
| for banned_term in args: |
| if re.search( |
| banned_term, |
| cl["revisions"][cl["current_revision"]]["commit"]["message"], |
| ): |
| printable_term = "".join(banned_term.splitlines()) |
| warning_strings.append(f"Found {printable_term} in change!") |
| return warning_strings |
| |
| @staticmethod |
| def check_hashtags(cl: Dict, args: List[str]) -> List[str]: |
| """Check hashtags for keywords. |
| |
| * Throw warning if keywords found in hashtags.. |
| |
| Args: |
| cl: gerrit CL dict for use in parsing. |
| args: list of keywords to flag. |
| |
| Returns: |
| warning_strings a list of strings to be printed for this CL. |
| * This must match the expected prototype used in checks. |
| """ |
| warning_strings = [] |
| for banned_hashtag in args: |
| if banned_hashtag in cl["hashtags"]: |
| warning_strings.append( |
| f"Change marked with hashtag {banned_hashtag}" |
| ) |
| return warning_strings |
| |
| def _find_cls_to_downstream(self) -> List[Dict]: |
| """Find all CLs to downstream for the given project. |
| |
| Returns: |
| A list of Gerrit CL dicts to downstream, including those which |
| have the {project}-downstream topic and the CLs to which they |
| are related. |
| """ |
| copybot_downstream_cls = self.gerrit_helper.Query( |
| topic=f"{self.project}-downstream", |
| status="open", |
| raw=True, |
| verbose=True, |
| convert_results=False, |
| ) |
| return copybot_downstream_cls |
| |
| def _check_cl( |
| self, |
| downstream_candidate_cl: Dict, |
| ) -> List[str]: |
| """Check whether the given CL is OK to downstream. |
| |
| Args: |
| downstream_candidate_cl: dict representing the CL that we want to downstream. |
| |
| Returns: |
| warnings: A list of warning strings stating problems with the CL. |
| If empty, that means there are no problems. |
| """ |
| warnings = [] |
| logging.info("Processing %s", downstream_candidate_cl["_number"]) |
| logging.debug( |
| "change info:\n\t%s", |
| "\n\t".join(f"{k}:{v}" for k, v in downstream_candidate_cl.items()), |
| ) |
| for func, extra_args in self.check_funcs: |
| tmp_warnings = func(downstream_candidate_cl, extra_args) |
| if tmp_warnings: |
| warnings.extend(tmp_warnings) |
| |
| return warnings |
| |
| def _filter_cls(self, cls_to_downstream: List[Dict]) -> List[Dict]: |
| """Filter full CL list based on the limit and/or CL the chain should stop at. |
| |
| Args: |
| cls_to_downstream: Ordered list of all CL candidates to be downstreamed. |
| |
| Returns: |
| Tuple(cls_to_downstream, all_warnings) |
| cls_to_downstream: Ordered list of filtered CLs to be downstreamed. |
| """ |
| filtered_cls = [] |
| cls_to_downstream = cls_to_downstream[: self.limit] |
| for change_num in cls_to_downstream: |
| if self.stop_at and self.stop_at == change_num: |
| logging.info( |
| "Matched change: %s, stop processing other changes", |
| change_num, |
| ) |
| break |
| filtered_cls.append(change_num) |
| return filtered_cls |
| |
| def _get_related_cls(self, change_number: str) -> List[str]: |
| """Get the list of related CLs for the passed in CL number. |
| |
| Args: |
| change_number: CL to find relationships of. |
| |
| Returns: |
| List of strings containing related CL numbers. |
| """ |
| return [ |
| x["_change_number"] |
| for x in self.gerrit_helper.GetRelatedChangesInfo(change_number)[ |
| "changes" |
| ] |
| if x["status"] == "NEW" |
| ] |
| |
| def _act_on_cls(self, cls_to_downstream: List[Dict]) -> None: |
| """Perform Gerrit updates on the CLs to downstream. |
| |
| Args: |
| cls_to_downstream: Ordered list of all CL candidates to be downstreamed. |
| """ |
| # TODO(b/278748163): Investigate bulk changes instead. |
| for i, change_num in enumerate(cls_to_downstream): |
| logging.info( |
| "Downstreaming %s: %d/%d", |
| change_num, |
| i + 1, |
| len(cls_to_downstream), |
| ) |
| |
| self.gerrit_helper.SetReviewers( |
| change=change_num, dryrun=self.dry_run, notify="NONE" |
| ) |
| self.gerrit_helper.SetReview( |
| change=change_num, |
| dryrun=self.dry_run, |
| # Add Verified label because client POST removes it. |
| labels={ |
| "Verified": "1", |
| "Code-Review": "2", |
| "Commit-Queue": "1" if self.cq_dry_run else "2", |
| }, |
| ) |
| |
| def _handle_checks_results(self, all_warnings: List[str]) -> int: |
| """Perform Gerrit updates on the CLs to downstream. |
| |
| Args: |
| all_warnings: A dictionary of lists of warning strings stating problems with these CLs. |
| Key: CL number with warnings |
| Value: List of warnings (str) found in the CL associated with the key. |
| If empty, that means there are no problems. |
| |
| Returns: |
| 0 if warnings are acceptable/ignored. |
| 1 if the script should exit due to warnings. |
| """ |
| if all_warnings: |
| for cl_num, warnings in all_warnings.items(): |
| logging.warning( |
| "http://crrev/c/%s Found warnings in change:\n\t%s", |
| cl_num, |
| "\n\t".join(warning for warning in warnings), |
| ) |
| if not self.ignore_warnings: |
| logging.error( |
| "Warnings detected in this run. Please address them.\n\t\tTo ignore the listed warnings, rerun with --ignore-warnings" |
| ) |
| return 1 |
| return 0 |
| |
| def cmd_downstream(self): |
| """Downstream copybot project CLs.""" |
| |
| copybot_downstream_cls = self._find_cls_to_downstream() |
| |
| if not copybot_downstream_cls: |
| logging.info("No %s CLs to downstream!", self.project) |
| return 0 |
| |
| all_warnings = defaultdict(list) |
| cls_to_downstream = [] |
| full_cl_list = [] |
| |
| for change in copybot_downstream_cls: |
| logging.debug("Top level Change: %s", change["_number"]) |
| if change["_number"] not in cls_to_downstream: |
| cls_to_downstream.append(change["_number"]) |
| full_cl_list.append(change) |
| logging.debug("\tAdded to list") |
| for related_change_number in self._get_related_cls( |
| change["_number"] |
| ): |
| logging.debug("\trelated Change: %s", related_change_number) |
| if related_change_number in cls_to_downstream: |
| logging.debug( |
| "\t\tSkipping this one since it already exists" |
| ) |
| continue |
| full_cl_list.append( |
| self.gerrit_helper.GetChangeDetail( |
| related_change_number, verbose=True |
| ) |
| ) |
| cls_to_downstream.append(related_change_number) |
| |
| for change in full_cl_list: |
| warnings = self._check_cl(change) |
| if warnings: |
| all_warnings[change["_number"]] = warnings |
| |
| # CL dependencies come in with newest first, so do reverse. |
| cls_to_downstream.reverse() |
| result = self._handle_checks_results(all_warnings) |
| if result != 0: |
| return result |
| cls_to_downstream = self._filter_cls(cls_to_downstream) |
| logging.info( |
| "Downstreaming the following CLs:\n%s", |
| "\n".join(str(change_num) for change_num in cls_to_downstream), |
| ) |
| |
| self._act_on_cls(cls_to_downstream) |
| |
| logging.info("All finished! Remember to monitor the CQ!") |
| return 0 |
| |
| def cmd_clear_attention(self): |
| """Remove user from attention set on merged CLs.""" |
| |
| cls_to_modify = self.gerrit_helper.Query( |
| topic=f"{self.project}-downstream", |
| status="merged", |
| attention="me", |
| raw=True, |
| ) |
| cls_to_modify.sort(key=lambda patch: patch["number"]) |
| |
| if self.limit: |
| cls_to_modify = cls_to_modify[: self.limit] |
| |
| counter = 0 |
| for cl in cls_to_modify: |
| logging.info( |
| "Updating attention set on CL %s (%d/%d)", |
| cl["number"], |
| counter + 1, |
| len(cls_to_modify), |
| ) |
| |
| self.gerrit_helper.SetAttentionSet( |
| change=cl["number"], |
| remove=("me",), |
| dryrun=self.dry_run, |
| notify="NONE", |
| ) |
| counter += 1 |
| |
| if self.stop_at and self.stop_at == cl["number"]: |
| break |
| |
| logging.info("Total CLs affected: %d", counter) |
| return 0 |
| |
| def run(self, cmd): |
| if cmd is None: |
| return self.cmd_downstream() |
| if cmd == "clear_attention": |
| return self.cmd_clear_attention() |
| |
| logging.error("Unknown subcommand %s", cmd) |
| return 1 |
| |
| |
| def main(args): |
| """Main entry point for CLI.""" |
| parser = downstream_argparser.generate_copybot_arg_parser() |
| opts = parser.parse_args(args) |
| CopybotDownstream( |
| opts.project, |
| opts.dry_run, |
| opts.cq_dry_run, |
| opts.limit, |
| opts.stop_at, |
| opts.ignore_warnings, |
| ).run(opts.cmd) |