blob: fbedd7db5500318926cac297473390235d9f95e1 [file] [log] [blame]
"""Release script invoked from git trigger upon submission of changes to release-versions.yaml config file to the cos/tools GoB repo
Parses contents of release-versions.yaml file and copies release candidates to
gcr.io/cos-tools
"""
from datetime import datetime
import json
import os
import re
import subprocess
import sys
import time
import yaml
_VULN_SCANNING_TAG_PREFIX = "public-image-"
_VULN_SCANNING_DEPR_TAG_PREFIX = "no-new-use-public-image-"
_COS_TOOLBOX_STAGING_NAME = "toolbox"
def validate_config(release_config):
for release_container in release_config:
for key in [
"staging_container_name",
"release_container_name",
"build_tag",
"release_tags",
"multi-arch",
]:
assert key in release_container, "missing {} in entry {}".format(
key, release_container
)
def validate_src_gcr_path(path):
# path format: us-docker.pkg.dev/cos-infra-prod/gcr-io-staging
path = path.split("/")
return (
len(path) == 3
and len(path[0]) > len("docker.pkg.dev/")
and len(path[1]) != 0
and path[2][-len("gcr-io-staging") :] == "gcr-io-staging"
)
def validate_dst_gcr_path(path):
# path format: us-docker.pkg.dev/cos-cloud/us.gcr.io
path = path.split("/")
return (
len(path) == 3
and len(path[0]) > len("docker.pkg.dev/")
and len(path[1]) != 0
and path[2][-len("gcr.io") :] == "gcr.io"
)
def parse_gcr_path(gcr_path):
"""Parses a valid Google Container Registry path into its location, project_id, and repository.
Args:
gcr_path: A valid GCR path string.
Returns:
A tuple containing (location, project_id, repository_name).
Returns None if the path is invalid or doesn't contain a repository.
"""
match = re.match(
r"([a-z0-9-]+)-docker\.pkg\.dev\/([a-z0-9-]+)\/?([a-z0-9\.-]+)", gcr_path
)
if match:
location, project_id, repository = match.groups()
print(
f"Matched groups: (location='{location}', project_id='{project_id}',"
f" repository='{repository}') (path: {gcr_path})"
)
return location, project_id, repository
else:
print(f"No match found for path: '{gcr_path}'")
return None
def call_promote_api(
promote_url, auth_header, content_type_header, promote_data, dry_run=False
):
"""Calls the Artifact Registry promote API and polls the promote results.
Args:
promote_url: The URL of the promote API endpoint.
auth_header: The authorization header for the API request.
content_type_header: The content type header for the API request.
promote_data: A dictionary containing the data to be sent in the API
request.
dry_run: If true, the actual promote command will not be run.
"""
curl_command = [
"curl",
"-H",
auth_header,
"-H",
content_type_header,
"-X",
"POST",
promote_url,
"-d",
json.dumps(promote_data), # Ensure promote_data is properly JSON encoded
]
if dry_run:
print("\nDry-run mode enabled. The following command would be executed:")
print(" ".join(curl_command))
print("\nNo actual API call was made.")
return
# Execute the curl command to call the promote API
print("\nExecuting the promote API call...")
result = subprocess.run(
curl_command, text=True, capture_output=True, check=True
)
print("\nPromote API Response:")
print(result.stdout)
if result.stderr:
print("Promote API Error:")
print(result.stderr)
# Fetch the dst_project_id and dst_location and operation_id from the response
pattern_regex = r'"name":\s*"projects/([a-zA-Z0-9-]+)/locations/([a-zA-Z-]+)/operations/([a-zA-Z0-9-]+)"'
match = re.search(pattern_regex, result.stdout)
if not match:
print(
"Error: Unable to extract destination project ID, location, and"
" operation ID from the response."
)
return
dst_project_id = match.group(1)
dst_location = match.group(2)
operation_id = match.group(3)
print(
f"Dest Project ID: {dst_project_id}; Dest Location: {dst_location};"
f" Direct Operation ID: {operation_id}"
)
# Check if the promote operation was successful with retry logic
max_retries = 10
for i in range(max_retries):
print(
f"\nChecking promote operation status (Attempt {i+1}/{max_retries})..."
)
query_result_url = f"https://artifactregistry.googleapis.com/v1/projects/{dst_project_id}/locations/{dst_location}/operations/{operation_id}"
query_result_command = [
"curl",
"-H",
auth_header,
"-H",
content_type_header,
query_result_url,
]
result = subprocess.run(
query_result_command, text=True, capture_output=True, check=True
)
print("\nCheck Promote Result Response:")
print(result.stdout)
if result.stderr:
print("Check Promote Result Error:")
print(result.stderr)
try:
response_json = json.loads(result.stdout)
if response_json.get("done"):
if "error" in response_json:
error_code = response_json["error"].get("code")
error_message = response_json["error"].get("message")
if error_code == 6:
print(
"Promote operation completed with a known error (code 6 entity"
f" already exists): {error_message}. Proceeding."
)
return
else:
raise RuntimeError(
f"Promote operation failed with error code {error_code}:"
f" {error_message}"
)
elif "response" in response_json:
print("Promote operation successful!")
return
else:
raise RuntimeError(
"Promote operation completed but neither 'response' nor 'error'"
" field found."
)
else:
print("Promote operation not yet done. Retrying in 30 seconds...")
time.sleep(30)
except json.JSONDecodeError as e:
print(
"Error: Could not decode JSON from promote status response."
" Retrying..."
)
raise e
except RuntimeError as e:
raise e # Re-raise the error to stop the process if it's a fatal one
raise RuntimeError(
f"Promote operation did not complete after {max_retries} retries."
)
def get_image_digest(image_name_with_tag):
"""Retrieves the digest of a container image from Artifact Registry.
Args:
image_name_with_tag: The full name of the container image with its tag
(e.g., "us-docker.pkg.dev/cos-cloud/gcr.io/cos-gpu-installer:v1.6.1").
Returns:
The digest of the image as a string.
Raises:
Exception: If the digest cannot be retrieved.
"""
print(f"Info: Finding digest for {image_name_with_tag}")
result = subprocess.run(
[
"gcloud",
"container",
"images",
"describe",
image_name_with_tag,
"--format",
"value(image_summary.digest)",
"-q",
],
capture_output=True,
text=True,
check=True,
)
digest = result.stdout.strip()
if not digest:
print(f"Info: Digest not found for {image_name_with_tag}")
raise RuntimeError(f"Digest not found for {image_name_with_tag}")
return digest
def check_image_tag_exists(image_name_with_tag):
"""Checks if a given image tag exists in Artifact Registry.
Args:
image_name_with_tag: The full name of the container image with its tag
(e.g., "us-docker.pkg.dev/cos-cloud/gcr.io/cos-gpu-installer:v1.6.1").
Returns:
True if the tag exists (a digest is found), False otherwise.
"""
try:
get_image_digest(image_name_with_tag)
return True
except Exception:
return False
def move_matching_gcr_location_to_front(src_gcr_path, target_gcr_paths):
"""Moves the first GCR path in the target_gcr_paths list that has the
same location (parsed using parse_gcr_path) as the src_gcr_path
to the beginning of the list(target_gcr_paths).
Args:
src_gcr_path: The source GCR path string.
Example: "us-docker.pkg.dev/cos-infra-prod/gcr-io-staging"
target_gcr_paths: A list of GCR path strings. This list will be modified
in-place.
Example: [ "eu-docker.pkg.dev/some-project/eu-repo",
"us-central1-docker.pkg.dev/another/us-image",
"asia-docker.pkg.dev/third/asia-image",
"us-docker.pkg.dev/cos-cloud/us.gcr.io" ]
Raises:
ValueError: If the source GCR path cannot be parsed.
ValueError: If no destination GCR path with a matching location is found.
Returns:
None. The target_gcr_paths list is modified directly.
"""
try:
src_location_tuple = parse_gcr_path(src_gcr_path)
src_location = src_location_tuple[0]
except ValueError as e:
raise ValueError(f"Error parsing source GCR path: {e}")
match_index = -1
for i, dst_gcr_path in enumerate(target_gcr_paths):
try:
dst_location_tuple = parse_gcr_path(dst_gcr_path)
if dst_location_tuple and dst_location_tuple[0] == src_location:
match_index = i
break
except ValueError:
# If a destination path is invalid, we simply skip it and continue searching
print(f"Warning: Skipping invalid destination GCR path: '{dst_gcr_path}'")
continue
if match_index != -1:
if match_index != 0:
matching_path = target_gcr_paths.pop(match_index)
target_gcr_paths.insert(0, matching_path)
else:
raise ValueError(
f"No destination GCR path with location '{src_location}' found."
)
def copy_container_image_with_gcrane(
src_path: str, dst_path: str, dry_run: bool = False
):
"""Copies a container image tag using `gcrane cp src dst`.
Args:
src_path: The source image path (e.g.,
us-docker.pkg.dev/cos-cloud/gcr.io/cos-gpu-installer:v1.6.1).
dst_path: The destination image path (e.g.,
asia-docker.pkg.dev/cos-cloud/asia.gcr.io/cos-gpu-installer:v1.6.1).
dry_run: If True, only prints the gcloud command instead of executing it.
"""
if check_image_tag_exists(src_path):
command = ["gcrane", "cp", src_path, dst_path]
if dry_run:
print(f"Dry-run: {' '.join(command)}")
else:
print(f"Executing command: {' '.join(command)}")
result = subprocess.run(command, capture_output=True, text=True)
print(f"Stdout: {result.stdout}")
if result.stderr:
print(f"Stderr: {result.stderr}")
result.check_returncode() # Raise an exception for non-zero exit codes
else:
print(
f"Warning: Source image tag {src_path} does not exist. Skipping adding"
f" tag {dst_path}."
)
def copy_container_image_cross_region(
src_bucket,
dst_bucket,
src_container_name,
dst_container_name,
release_tags,
is_multi_arch,
dry_run=False,
):
"""Copies container image tags across Google Container Registry regions.
This function should only be called between production environments
with release tag(s) already existing. Usually after the MOSS promote
artifact API has already been called between the same region.
Args:
src_bucket: The source GCR bucket path (e.g.,
us-docker.pkg.dev/cos-cloud/gcr.io).
dst_bucket: The destination GCR bucket path (e.g.,
asia-docker.pkg.dev/cos-cloud/asia.gcr.io).
src_container_name: The name of the container image in the source repository
(e.g., cos-gpu-installer).
dst_container_name: The name to use for the container image in the
destination repository (e.g., cos-gpu-installer).
release_tags: A list of release tags to copy (e.g., ["v1.0.0", "stable"]).
is_multi_arch: A boolean indicating if the image is multi-architecture.
dry_run: If True, only prints the gcloud commands instead of executing them.
"""
src_location, src_project_id, source_repo = parse_gcr_path(src_bucket)
dst_location, dst_project_id, prod_repo = parse_gcr_path(dst_bucket)
src_full_image_name = f"{src_location}-docker.pkg.dev/{src_project_id}/{source_repo}/{src_container_name}"
dst_full_image_name = f"{dst_location}-docker.pkg.dev/{dst_project_id}/{prod_repo}/{dst_container_name}"
for release_tag in release_tags:
copy_container_image_with_gcrane(
src_full_image_name + ":" + release_tag,
dst_full_image_name + ":" + release_tag,
dry_run,
)
def copy_container_image_within_same_region(
src_bucket,
dst_bucket,
staging_container_name,
release_container_name,
build_tag,
release_tags,
is_multi_arch,
dry_run=False,
):
"""Copies a container image from a staging repository to a release repository
using Artifact Registry's promote functionality. Since MOSS does not support
cross region promotion, this function only supports promotion between same
region.
Args:
src_bucket: The source GCR bucket path.
dst_bucket: The destination GCR bucket path.
staging_container_name: The name of the container in the staging repository.
release_container_name: The name of the container in the release repository.
build_tag: The tag of the image to promote in the source repository.
release_tags: A list of tags to apply to the promoted image in the
destination repository.
is_multi_arch: A boolean indicating if the source image is a
multi-architecture image with 'BUILD_ID_amd64' and 'BUILD_ID_arm64' tags.
"""
src_location, src_project_id, source_repo = parse_gcr_path(src_bucket)
dst_location, dst_project_id, prod_repo = parse_gcr_path(dst_bucket)
src_full_image_name = f"{src_location}-docker.pkg.dev/{src_project_id}/{source_repo}/{staging_container_name}"
dst_full_image_name = f"{dst_location}-docker.pkg.dev/{dst_project_id}/{prod_repo}/{release_container_name}"
# Check if the release tag already exists.
for release_tag in release_tags:
if release_tag != "latest":
if check_image_tag_exists(f"{dst_full_image_name}:{release_tag}"):
print(
f"release tag already exist: {dst_full_image_name}:{release_tag},"
" skip copying"
)
if "latest" in release_tags:
print(f"Attaching latest tag to {release_tag}...")
copy_container_image_with_gcrane(
f"{dst_full_image_name}:{release_tag}",
f"{dst_full_image_name}:latest",
)
return
if src_location == dst_location:
# If it's the same region, do the promotion.
main_image_with_tag = f"{src_full_image_name}:{build_tag}"
latest_digest = get_image_digest(main_image_with_tag)
# Get the access token
try:
token_process = subprocess.run(
["gcloud", "auth", "print-access-token"],
capture_output=True,
text=True,
check=True,
)
access_token = token_process.stdout.strip()
except subprocess.CalledProcessError as e:
raise Exception(f"Error getting access token: {e}")
promote_url = f"https://artifactregistry.googleapis.com/v1/projects/{dst_project_id}/locations/{dst_location}/repositories/{prod_repo}:promoteArtifact"
auth_header = f"Authorization: Bearer {access_token}"
content_type_header = "Content-Type: application/json"
if is_multi_arch:
amd64_image_with_tag = f"{src_full_image_name}:{build_tag}_amd64"
latest_amd64_digest = get_image_digest(amd64_image_with_tag)
arm64_image_with_tag = f"{src_full_image_name}:{build_tag}_arm64"
latest_arm64_digest = get_image_digest(arm64_image_with_tag)
promote_data_amd64 = {
"source_repository": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}"
),
"source_version": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}/packages/{staging_container_name}/versions/{latest_amd64_digest}"
),
"attachment_behavior": "INCLUDE",
}
# Promote amd64 images
call_promote_api(
promote_url,
auth_header,
content_type_header,
promote_data_amd64,
dry_run,
)
promote_data_arm64 = {
"source_repository": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}"
),
"source_version": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}/packages/{staging_container_name}/versions/{latest_arm64_digest}"
),
"attachment_behavior": "INCLUDE",
}
# Prmote arm64 images
call_promote_api(
promote_url,
auth_header,
content_type_header,
promote_data_arm64,
dry_run,
)
promote_data_main = {
"source_repository": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}"
),
"source_version": (
f"projects/{src_project_id}/locations/{src_location}/repositories/{source_repo}/packages/{staging_container_name}/versions/{latest_digest}"
),
"attachment_behavior": "INCLUDE",
}
call_promote_api(
promote_url,
auth_header,
content_type_header,
promote_data_main,
dry_run,
)
else:
raise Exception("Cannot promote artifacts between different region")
# Tag the promoted images
print("Waiting 30s before tagging promoted main image...")
time.sleep(30)
for release_tag in release_tags:
copy_container_image_with_gcrane(
f"{dst_full_image_name}@{latest_digest}",
f"{dst_full_image_name}:{release_tag}",
)
# Add tag to scan toolbox for vulnerabilities. This tag tells public image scanning pipline to scan the image and create bugs.
def add_tag_for_vuln_scanning(dst_bucket, release_container_name, release_tags):
if release_container_name != _COS_TOOLBOX_STAGING_NAME:
return
assert validate_dst_gcr_path(dst_bucket), (
"cannot use address {}, only"
" <location>-docker.pkg.dev/<project-name>/<location(optional)>gcr.io/"
" addresses are supported".format(dst_bucket)
)
dst_path = os.path.join(dst_bucket, release_container_name)
# Check if we tagged any image with _VULN_SCANNING_TAG before.
tag_filter = "--filter=tags:{}".format(_VULN_SCANNING_TAG_PREFIX)
existing_tags = (
subprocess.run(
[
"gcloud",
"container",
"images",
"list-tags",
dst_path,
tag_filter,
"--format=json",
],
capture_output=True,
check=True,
)
.stdout.decode("utf-8")
.rstrip()
)
# If a new image is not getting released, do not update vuln scanning tags.
release_tag = release_tags[0]
for rel_tag in release_tags:
if not rel_tag.startswith(
"v"
): # Skip "latest" or any other non-unique tags.
continue
if rel_tag in existing_tags: # Not releasing a new image.
return
release_tag = rel_tag
break
now = datetime.now()
date_time = now.strftime("%m%d%Y-%H%M%S")
scanning_depr_tag = _VULN_SCANNING_DEPR_TAG_PREFIX + date_time
# If no image is tagged with "public-image", add the tag and return.
if existing_tags == "[]":
subprocess.run(
[
"gcloud",
"container",
"images",
"add-tag",
dst_path + ":" + release_tag,
dst_path + ":" + _VULN_SCANNING_TAG_PREFIX,
"-q",
],
check=True,
)
return
# Use current date-time as a suffix to make deprecation tag unique.
subprocess.run(
[
"gcloud",
"container",
"images",
"add-tag",
dst_path + ":" + _VULN_SCANNING_TAG_PREFIX,
dst_path + ":" + scanning_depr_tag,
"-q",
],
check=True,
)
subprocess.run(
[
"gcloud",
"container",
"images",
"add-tag",
dst_path + ":" + release_tag,
dst_path + ":" + _VULN_SCANNING_TAG_PREFIX,
"-q",
],
check=True,
)
def verify_and_release(src_bucket, dst_buckets, release, dry_run=False):
with open("release/release-versions.yaml", "r") as file:
try:
release_config = yaml.safe_load(file)
validate_config(release_config)
if release:
dst_buckets = dst_buckets.split("^")
assert validate_src_gcr_path(src_bucket), (
f"cannot use address {src_bucket}, only"
" <location>-docker.pkg.dev/<project-name>/<repository-name>/"
" addresses are supported"
)
for dst_bucket in dst_buckets:
assert validate_dst_gcr_path(dst_bucket), (
f"cannot use address {dst_bucket}, only"
" <location>-docker.pkg.dev/<project-name>/<repository-name>/"
" addresses are supported"
)
# We need to make sure the dst_buckets[0] is the one that has the same
# region as the src_bucket. Since MOSS does not accept cross-region
# promotion, we have to firstly promote artifact to the same region and
# then use `gcrane cp src dst` to fan out to other regions.
move_matching_gcr_location_to_front(src_bucket, dst_buckets)
print(f"Destination Buckets (reordered): {dst_buckets}")
for release_container in release_config:
staging_container_name = release_container["staging_container_name"]
release_container_name = release_container["release_container_name"]
build_tag = release_container["build_tag"]
release_tags = release_container["release_tags"]
is_multi_arch = release_container["multi-arch"]
for i in range(len(dst_buckets)):
# Since we called move_matching_gcr_location_to_front, we know that
# dst_buckets[0] is guaranteed to be the same region as src_bucket.
if i == 0:
# Same region promotion.
copy_container_image_within_same_region(
src_bucket,
dst_buckets[0],
staging_container_name,
release_container_name,
build_tag,
release_tags,
is_multi_arch,
dry_run,
)
else:
# Cross region copy. Since the artifact has already been promoted to
# the prod, we simply copy it to other regions in prod(e.x. cos-cloud).
copy_container_image_cross_region(
dst_buckets[0],
dst_buckets[i],
release_container_name,
release_container_name,
release_tags,
is_multi_arch,
dry_run,
)
# Disable until b/383527770 is resolved.
# add_tag_for_vuln_scanning(src_bucket, staging_container_name, build_tag)
except yaml.YAMLError as ex:
raise Exception("Invalid YAML config: %s" % str(ex))
def main():
dry_run = False
if len(sys.argv) == 2 and sys.argv[1] == "--verify":
verify_and_release("", "", False, dry_run)
elif len(sys.argv) == 3:
src_bucket = sys.argv[1]
dst_buckets = sys.argv[2]
verify_and_release(src_bucket, dst_buckets, True, dry_run)
else:
sys.exit(
"sample use: ./release_script <source_gcr_path> <destination_gcr_paths>"
" \n example use: ./release_script"
" us-docker.pkg.dev/cos-infra-prod/gcr-io-staging"
" us-docker.pkg.dev/cos-cloud/us.gcr.io^europe-docker.pkg.dev/cos-cloud/eu.gcr.io"
)
if __name__ == "__main__":
main()