| """ |
| Release-note-cve-verifier is invoked from git trigger for every CL. |
| It verifies that CVEs listed in the release note are fixed. |
| This ensures that release CL will not be submitted with a wrong release note. |
| """ |
| |
| import sys |
| import yaml |
| import subprocess |
| import os |
| import git |
| import re |
| import logging |
| from google.cloud.devtools import containeranalysis_v1 |
| |
| def validate_config(release_config): |
| for release_container in release_config: |
| for key in ["staging_container_name", "release_container_name", "build_commit", "release_tags"]: |
| assert key in release_container, "missing {} in entry {}".format(key, release_container) |
| |
| def validate_src_gcr_path(path): |
| # path format: gcr.io/cos-infra-prod |
| return len(path) > len("gcr.io/") and path[:len("gcr.io/")] == "gcr.io/" |
| |
| def verify_release_note(src_bucket): |
| assert validate_src_gcr_path(src_bucket), "cannot use address {}, only gcr.io/ addresses are supported".format(src_bucket) |
| with open('release/release-versions.yaml', 'r') as file: |
| try: |
| release_config = yaml.safe_load(file) |
| validate_config(release_config) |
| |
| # Get the project id from src bucket. |
| # We already verified the format of the src bucket. |
| project_id = src_bucket[len("gcr.io/"):] |
| cves = release_note_cves() |
| logging.info("CVEs listed on the release notes: %s", cves) |
| verify_result = True |
| for release_container in release_config: |
| staging_container_name = release_container["staging_container_name"] |
| build_tag = release_container["build_commit"] |
| src_path = os.path.join(src_bucket, staging_container_name) |
| container_tag_url = src_path + ":" + build_tag |
| # We need digest URL for occurences. |
| digest = container_digest(container_tag_url) |
| container_digest_url = "https://" + src_path + "@" + digest |
| logging.info("container URL: %s", container_digest_url) |
| if not verify_fixed_cves(project_id, container_digest_url, cves): |
| verify_result = False |
| return verify_result |
| |
| except yaml.YAMLError as ex: |
| raise Exception("Invalid YAML config: %s" % str(ex)) |
| |
| def container_digest(container_url): |
| """Returns digest of the container.""" |
| # contaiener_url = 'gcr.io/my-project/my-image:abc' |
| |
| digest = subprocess.run( |
| ['gcloud', 'container', 'images', 'describe', container_url, '--format', 'value(image_summary.digest)'], |
| capture_output = True, |
| check = True |
| ) |
| return digest.stdout.decode('utf-8').strip('\"').rstrip() |
| |
| def verify_fixed_cves(project_id, container_digest_url, cves): |
| """Retrieves all the occurrences associated with a specified image. |
| Returns false if any of the cves among the occurrences. |
| Otherwise, return true.""" |
| # container_digest_url = 'https://gcr.io/my-project/my-image@sha256:123' |
| # project_id = 'my-gcp-project' |
| # cves = [CVE-2020-1111, CVE-2021-1111] |
| |
| client = containeranalysis_v1.ContainerAnalysisClient() |
| grafeas_client = client.get_grafeas_client() |
| project_name = f"projects/{project_id}" |
| for cve in cves: |
| filter_str = f'kind="VULNERABILITY" AND resourceUrl="{container_digest_url}" AND noteId="{cve}"' |
| logging.info(filter_str) |
| occurrences = grafeas_client.list_occurrences(parent=project_name, filter=filter_str) |
| for o in occurrences: |
| logging.info("%s has not been fixed in the %s", cve, container_digest_url) |
| return False |
| logging.info("%s does not affect the image", cve) |
| return True |
| |
| def commit_message(): |
| """Returns the last commit message""" |
| repo = git.Repo(os.getcwd()) |
| main = repo.head.reference |
| return main.commit.message |
| |
| def release_note_cves(): |
| """Finds the CVEs listed in the release note of the commit message.""" |
| commit_msg = commit_message() |
| cves = [] |
| reg = re.compile('CVE-\d{4}-\d{4,7}') |
| lines = commit_msg.splitlines() |
| for i in range(len(lines)): |
| line = lines[i] |
| # Find start of RELEASE_NOTE and scan the following lines until |
| # there is an empty line. |
| if line.startswith('RELEASE_NOTE='): |
| while line: #Until we hit to empty line |
| cves = cves + reg.findall(line) |
| i = i + 1 |
| line = lines[i].strip() |
| return cves |
| |
| def main(): |
| logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
| src_bucket = sys.argv[1] |
| logging.info("source bucket: %s", src_bucket) |
| if not verify_release_note(src_bucket): |
| sys.exit("Release note verification failed. Check the logs above for details.") |
| |
| if __name__ == '__main__': |
| main() |