blob: c9f47987a2f6ef176b81d0262e7081b946669208 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import re
import shlex
import shutil
import subprocess
from subprocess import Popen, PIPE
import sys
import time
import uuid
import json
from google.cloud import storage
from google.api_core import exceptions as cloud_exceptions
# Appends the third_party.autotest and src paths so that the script can import
# libraries under these paths.
sys.path.append('/mnt/host/source/src/platform/moblab/src')
# pylint: disable=no-name-in-module, import-error
import common
from tko import job_serializer, models, parser_lib
from moblab_common import pubsub_client
CURRENT_TIMESTAMP = int(time.time())
FAKE_JOB_ID = CURRENT_TIMESTAMP
CONTROL_FILE = "control"
JOB_SERIALIZE_FILE = "job.serialize"
STATUS_FILE = "status"
KEYVAL_FILE = "keyval"
NEW_KEYVAL_FILE = "new_keyval"
UPLOADED_STATUS_FILE = ".uploader_status"
STATUS_GOOD = "PUBSUB_SENT"
FAKE_MOBLAB_ID_FILE = "fake_moblab_id_do_not_delete.txt"
GIT_HASH_FILE = "git_hash.txt"
GIT_COMMAND = ("git log --pretty=format:'%h -%d %s (%ci) <%an>'"
" --abbrev-commit -20")
AUTOTEST_DIR = "/mnt/host/source/src/third_party/autotest/files/"
DEFAULT_SUITE_NAME = "default_suite"
SUITE_NAME_REGEX = "Fetching suite for suite named (.+?)\.\.\."
DEBUG_FILE_PATH = "debug/test_that.DEBUG"
CONFIG_DIR = os.path.dirname(os.path.abspath(__file__)) + "/config/"
DEFAULT_BOTO_CONFIG = CONFIG_DIR + ".boto_upload_utils"
UPLOAD_CONFIG = CONFIG_DIR + "upload_config.json"
SERVICE_ACCOUNT_CONFIG = CONFIG_DIR + ".service_account.json"
def parse_arguments(argv):
"""Creates the argument parser.
Args:
argv: A list of input arguments.
Returns:
A parser object for input arguments.
"""
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers(
help='select sub option for test result utility',
dest='subcommand')
parser.add_argument("-v",
"--verbose",
action='store_true',
help="Enable verbose (debug) logging.")
def_logfile = "/tmp/" + os.path.basename(
sys.argv[0].split(".")[0]) + ".log"
parser.add_argument("-l",
"--logfile",
type=str,
required=False,
default=def_logfile,
help="Full path to logfile. Default: " + def_logfile)
# configuration subcommand to create config file and populate environment
config_parser = subparsers.add_parser(name="config",
help='upload test results to CPCon')
config_parser.add_argument(
"-b",
"--bucket",
type=str,
required=True,
help="The GCS bucket that test results are uploaded to, e.g."
"'gs://xxxx'.")
config_parser.add_argument("-f",
"--force",
dest='force',
action="store_true",
help="Force overwrite of previous config files")
upload_parser = subparsers.add_parser(name="upload",
help='upload test results to CPCon')
upload_parser.add_argument(
"--bug",
type=_valid_bug_id,
required=False,
help=
"Write bug id to the test results. Each test entry can only have "
"at most 1 bug id. Optional.")
upload_parser.add_argument(
"-d",
"--directory",
type=str,
required=True,
nargs='+',
help="The directory of non-Moblab test results.")
upload_parser.add_argument(
"--dry_run",
action='store_true',
help="Generate job.serialize locally but do not upload test "
"directories and not send pubsub messages.")
upload_parser.add_argument(
"-s",
"--suite",
type=str,
default=None,
help="The suite is used to identify the type of test results,"
"e.g. 'power' for platform power team. If not specific, the "
"default value is 'default_suite'.")
return parser.parse_args(argv)
def fetch_test_dirs(parent_dir, test_dirs):
""" Gets all test directories.
Args:
parent_dir: The parent directory of one or multiple test directories
test_dirs: The output set of test directories.
"""
if not os.path.exists(parent_dir) or not os.path.isdir(parent_dir):
logging.warning('Test directory does not exist: %s' % parent_dir)
return
control_file = os.path.join(parent_dir, CONTROL_FILE)
status_file = os.path.join(parent_dir, STATUS_FILE)
if os.path.exists(control_file) and os.path.exists(status_file):
test_dirs.add(parent_dir)
return
for dir_name in os.listdir(parent_dir):
subdir = os.path.join(parent_dir, dir_name)
if os.path.isdir(subdir):
fetch_test_dirs(subdir, test_dirs)
def parse_test_job(test_dir, job_keyval, job_id, suite):
"""
Parses test results and get the job object for the given test directory.
The job object will be used to generate job.serialize file.
Args:
test_dir: The test directory for non-moblab test results.
job_keyval: The key-value object of the job.
job_id: A job_id.
suite: A suite name.
Returns:
The job object or None if build version not found.
"""
logging.info("Start to parse the test job for: %s", test_dir)
# Looks up the status version and hostname
status_version = job_keyval.get("status_version", 0)
parser = parser_lib.parser(status_version)
status_log_path = _find_status_log_path(test_dir)
if not status_log_path:
logging.warning('No status log file exists in: %s', test_dir)
return None
job = parser.make_job(test_dir)
# workaround for CPCon pipeline, "board" is mandatory
job.board = job.machine_group
job.tests = _get_job_tests(parser, job, status_log_path)
# The job id and suite name are necessary for CPCon pipeline. Since
# non-moblab test results don't have these fields, the values here are fake.
# Set a different afe_paren_job_id so that the test won't be regarded as a
# suite test.
job.afe_job_id = job_id
job.afe_parent_job_id = job_id + "1"
job.suite = suite
job.build_version = _get_build_version(job.tests)
job.build = job.build_version
if not job.build_version:
logging.warning('Failed to get build version in: %s', test_dir)
return None
logging.info("Successfully parsed the job.serialize for: %s", test_dir)
return job
def generate_job_serialize(test_dir, job, job_keyval):
"""
Generate the job.serialize for the given test directory, job object and
job key-value object.
Args:
test_dir: The test directory for non-moblab test results.
job: A job object.
job_keyval: The key-value object of the job.
"""
job_serialize_file = os.path.join(test_dir, JOB_SERIALIZE_FILE)
logging.info("Start to generate the job.serialize for: %s",
job_serialize_file)
hostname = job_keyval.get('hostname', '0.0.0.0') # ip address
job_name = "%s-moblab/%s" % (job.afe_job_id, hostname)
serializer = job_serializer.JobSerializer()
serializer.serialize_to_binary(job, job_name, job_serialize_file)
logging.info("Successfully generated the job.serialize for: %s",
job_serialize_file)
def is_pubsub_sent(test_dir):
"""
Checks if the message for the uploaded bucket has been sent.
Args:
test_dir: The test directory for non-moblab test results.
"""
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
if not os.path.exists(upload_status_file):
logging.debug("The upload status file %s does not exist.",
upload_status_file)
return False
with open(upload_status_file, "r") as upload_status:
if upload_status.read() == STATUS_GOOD:
logging.warn(
"The test directory: %s status has already been "
"sent to CPCon and the .upload_status file has "
"been set to PUBSUB_SENT.", test_dir)
return True
else:
logging.debug("The pubsub message was not successful")
return False
def upload_test_results(bucket, test_dir, job_keyval, moblab_id, job_id):
"""
Upload the test directory with job.serialize to GCS bucket.
Args:
args: A list of input arguments.
test_dir: The test directory for non-moblab test results.
job_keyval: The key-value object of the job.
moblab_id: A string that represents the unique id of a moblab device.
job_id: A job id.
"""
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
with open(upload_status_file, "w") as upload_status:
upload_status.write("UPLOADING")
fake_moblab_id = moblab_id
fake_moblab_install_id = moblab_id
hostname = job_keyval.get('hostname', '0.0.0.0') # ip address
gcs_bucket_path = os.path.join("gs://%s" % bucket, "results",
fake_moblab_id, fake_moblab_install_id,
"%s-moblab" % job_id, hostname)
try:
logging.info(
"Start to upload test directory: %s to GCS bucket path: %s",
test_dir, gcs_bucket_path)
with open(upload_status_file, "w") as upload_status:
upload_status.write("UPLOADED")
cmd = "gsutil -m cp -r %s %s" % (test_dir, gcs_bucket_path)
subprocess.check_output(shlex.split(cmd))
logging.info(
"Successfully uploaded test directory: %s to GCS bucket path: %s",
test_dir, gcs_bucket_path)
except Exception as e:
with open(upload_status_file, "w") as upload_status:
upload_status.write("UPLOAD_FAILED")
raise Exception("Failed to upload test directory: %s to GCS bucket "
"path: %s for the error: %s" %
(test_dir, gcs_bucket_path, e))
def send_pubsub_message(test_dir, bucket, moblab_id, job_id):
"""
Send pubsub messages to trigger CPCon pipeline to process non-moblab
test results in the specific GCS bucket path.
Args:
bucket: The GCS bucket.
moblab_id: A moblab id.
job_id: A job id.
"""
moblab_install_id = moblab_id
console_client = pubsub_client.PubSubBasedClient()
gsuri = "gs://%s/results/%s/%s/%s-moblab" % (bucket, moblab_id,
moblab_install_id, job_id)
try:
logging.info("Start to send the pubsub message to GCS path: %s", gsuri)
message_id = \
console_client.send_test_job_offloaded_message(gsuri,
moblab_id,
moblab_install_id)
upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE)
with open(upload_status_file, "w") as upload_status:
upload_status.write(STATUS_GOOD)
logging.info(
"Successfully sent the pubsub message with message id: %s to GCS "
"path: %s", message_id[0], gsuri)
except Exception as e:
raise Exception("Failed to send the pubsub message with moblab id: %s "
"and job id: %s to GCS path: %s for the error: %s" %
(moblab_id, job_id, gsuri, e))
def _find_status_log_path(test_dir):
log_path = os.path.join(test_dir, "status.log")
if os.path.exists(log_path):
return log_path
log_path = os.path.join(test_dir, "status")
if os.path.exists(log_path):
return log_path
return ""
def _get_job_tests(parser, job, status_log_path):
status_lines = open(status_log_path).readlines()
parser.start(job)
tests = parser.end(status_lines)
# The parser.end can return the same object multiple times, so filter out
# dups.
return list(set([test for test in tests]))
def _get_build_version(tests):
release_version_label = "CHROMEOS_RELEASE_VERSION"
milestone_label = "CHROMEOS_RELEASE_CHROME_MILESTONE"
for test in tests:
if not test.subdir:
continue
release = None
milestone = None
if release_version_label in test.attributes:
release = test.attributes[release_version_label]
if milestone_label in test.attributes:
milestone = test.attributes[milestone_label]
if release and milestone:
return "R%s-%s" % (milestone, release)
return ""
def _get_partial_object_path(bucket, moblab_id):
storage_client = storage.Client()
blob_itr = storage_client.bucket(bucket).list_blobs(prefix=moblab_id)
for blob in blob_itr:
if ("job.serialize" in blob.name and "moblab" in blob.name):
yield blob.name
def _confirm_option(question):
"""
Get a yes/no answer from the user via command line.
Args:
question: string, question to ask the user.
Returns:
A boolean. True if yes; False if no.
"""
expected_answers = ['y', 'yes', 'n', 'no']
answer = ''
while answer not in expected_answers:
answer = input(question + "(y/n): ").lower().strip()
return answer[0] == "y"
def _write_bug_id(test_dir, bug_id):
"""
Write the bug id to the test results.
Args:
test_dir: The test directory for non-moblab test results.
bug_id: The bug id to write to the test results.
Returns:
A boolean. True if the bug id is written successfully or is the same as
the old bug id already in test results; False if failed to write the
bug id, or if the user decides not to overwrite the old bug id already
in test results.
"""
old_bug_id = None
new_keyval = list()
keyval_file = os.path.join(test_dir, KEYVAL_FILE)
try:
with open(keyval_file, 'r') as keyval_raw:
for line in keyval_raw.readlines():
match = re.match(r'bug_id=(\d+)', line)
if match:
old_bug_id = _valid_bug_id(match.group(1))
else:
new_keyval.append(line)
except IOError as e:
logging.error(
'Cannot read keyval file from %s, skip writing the bug '
'id %s: %s', test_dir, bug_id, e)
return False
if old_bug_id:
if old_bug_id == bug_id:
return True
overwrite_bug_id = _confirm_option(
'Would you like to overwrite bug id '
'%s with new bug id %s?' % (old_bug_id, bug_id))
if not overwrite_bug_id:
return False
new_keyval.append('bug_id=%s' % bug_id)
new_keyval_file = os.path.join(test_dir, NEW_KEYVAL_FILE)
try:
with open(new_keyval_file, 'w') as new_keyval_raw:
for line in new_keyval:
new_keyval_raw.write(line)
new_keyval_raw.write('\n')
shutil.move(new_keyval_file, keyval_file)
return True
except Exception as e:
logging.error(
'Cannot write bug id to keyval file in %s, skip writing '
'the bug id %s: %s', test_dir, bug_id, e)
return False
def _valid_bug_id(v):
"""Check if user input bug id is in valid format.
Args:
v: User input bug id in string.
Returns:
An int representing the bug id.
Raises:
argparse.ArgumentTypeError: if user input bug id has wrong format.
"""
try:
bug_id = int(v)
except ValueError as e:
raise argparse.ArgumentTypeError(
"Bug id %s is not a positive integer: "
"%s" % (v, e))
if bug_id <= 0:
raise argparse.ArgumentTypeError(
"Bug id %s is not a positive integer" % v)
return bug_id
def _get_fake_moblab_id():
"""Get or generate a fake moblab id.
Moblab id is the unique id to a moblab device. Since the upload script runs
from the chroot instead of a moblab device, we need to generate a fake
moblab id to comply with the CPCon backend. If there is a previously saved
fake moblab id, read and use it. Otherwise, generate a uuid to fake a moblab
device, and store it in the same directory as the upload script.
Returns:
A string representing a fake moblab id.
"""
script_dir = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))
fake_moblab_id_path = os.path.join(script_dir, FAKE_MOBLAB_ID_FILE)
try:
with open(fake_moblab_id_path, "r") as fake_moblab_id_file:
fake_moblab_id = str(fake_moblab_id_file.read())[0:32]
if fake_moblab_id:
return fake_moblab_id
except IOError as e:
logging.info('Cannot find a fake moblab id at %s, creating a new one.',
fake_moblab_id_path)
fake_moblab_id = uuid.uuid4().hex
try:
with open(fake_moblab_id_path, "w") as fake_moblab_id_file:
fake_moblab_id_file.write(fake_moblab_id)
except IOError as e:
logging.warning('Unable to write the fake moblab id to %s: %s',
fake_moblab_id_path, e)
return fake_moblab_id
def print_autotest_git_history(test_dir):
"""
Print the hash of the latest git commit of the autotest directory.
Args:
test_dir: The test directory for non-moblab test results.
"""
git_hash = subprocess.check_output(shlex.split(GIT_COMMAND),
cwd=AUTOTEST_DIR)
git_hash_path = os.path.join(test_dir, GIT_HASH_FILE)
with open(git_hash_path, "w") as git_hash_file:
git_hash_file.write(git_hash.decode("utf-8"))
def get_suite_name(results_dir):
"""Get the suite name from a results directory.
If we don't find the suite name in the first ten lines of test_that.DEBUG
then return None.
Args:
results_dir: The directory specified on the command line.
"""
debug_file = os.path.join(results_dir, DEBUG_FILE_PATH)
if not os.path.exists(debug_file) or not os.path.isfile(debug_file):
return None
exp = re.compile(SUITE_NAME_REGEX)
try:
with open(debug_file) as f:
line_count = 0
for line in f:
line_count += 1
if line_count > 10:
break
result = exp.search(line)
if not result:
continue
else:
return result.group(1)
except IOError as e:
logging.warning('Error trying to read test_that.DEBUG: %s', e)
return None
def read_until_string(pipe, stop_string):
lines = [""]
while True:
c = pipe.read(1)
lines[-1] = lines[-1] + c.decode("utf-8")
if stop_string == lines[-1]:
return lines
if c.decode("utf-8") == "\n":
lines.append("")
def configure_environment(parsed_args):
# create config directory if not exists
os.makedirs(CONFIG_DIR, exist_ok=True)
if os.path.exists(UPLOAD_CONFIG) and not parsed_args.force:
logging.error("Environment already configured, run with --force")
exit()
# call the gsutil config tool to set up accounts
if os.path.exists(DEFAULT_BOTO_CONFIG + ".bak"):
os.remove(DEFAULT_BOTO_CONFIG + ".bak")
if os.path.exists(DEFAULT_BOTO_CONFIG):
os.remove(DEFAULT_BOTO_CONFIG)
os.mknod(DEFAULT_BOTO_CONFIG)
os.environ["BOTO_CONFIG"] = DEFAULT_BOTO_CONFIG
with subprocess.Popen(["gsutil", "config"],
stdout=PIPE,
stderr=PIPE,
stdin=PIPE) as sp:
lines = read_until_string(sp.stdout, "Enter the authorization code: ")
code = input("enter auth code from " + str(lines[1]) + ": ")
sp.stdin.write(bytes(code + '\n', "utf-8"))
sp.stdin.flush()
lines = read_until_string(sp.stdout, "What is your project-id? ")
sp.stdin.write(bytes(parsed_args.bucket + '\n', "utf-8"))
sp.stdin.flush()
# use configured gsutil to download service_account to config
client = storage.Client()
bucket = client.lookup_bucket(parsed_args.bucket)
blob = bucket.get_blob(".service_account.json")
if bucket is None:
logging.error("Invalid bucket selected, try a different bucket")
exit()
else:
logging.info("Bucket name saved to config")
with open(SERVICE_ACCOUNT_CONFIG, "wb") as file_obj:
file_obj.write(blob.download_as_bytes())
# deposit parsed_args.bucket to the json file
with open(UPLOAD_CONFIG, "w") as cf:
settings = {}
settings["bucket"] = parsed_args.bucket
settings["service_account"] = CONFIG_DIR + ".service_account.json"
settings["boto_key"] = DEFAULT_BOTO_CONFIG
cf.write(json.dumps(settings))
def load_config():
mandatory_keys = ["bucket", "service_account", "boto_key"]
if not os.path.exists(UPLOAD_CONFIG):
logging.error("Missing mandatory config file, run config command")
exit()
with open(UPLOAD_CONFIG, "r") as cf:
settings = json.load(cf)
for key in mandatory_keys:
if key not in settings:
logging.error("Missing mandatory setting " + str(key) +
", run config command")
exit()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = settings["service_account"]
os.environ["BOTO_CONFIG"] = settings["boto_key"]
return argparse.Namespace(**settings)
def main(args):
parsed_args = parse_arguments(args)
logger = logging.getLogger()
fmt = logging.Formatter('%(asctime)s :: %(levelname)-8s :: %(message)s')
log_level = logging.INFO
if parsed_args.verbose:
log_level = logging.DEBUG
# modify existing handlers
for handler in logger.handlers:
handler.setFormatter(fmt)
handler.setLevel(log_level)
logging.info("logging to %s", parsed_args.logfile)
hfile = logging.FileHandler(parsed_args.logfile, mode='w')
hfile.setFormatter(fmt)
hfile.setLevel(log_level)
logger.addHandler(hfile)
if parsed_args.subcommand == "config":
configure_environment(parsed_args)
return
persistent_settings = load_config()
# The non-moblab test results generated by test_that CLI don't have moblab
# id, moblab install id, suite name and job id. Thus, we need to fake these
# fields with valid values.
fake_moblab_id = _get_fake_moblab_id()
result_dirs = map(os.path.normpath, parsed_args.directory)
for directory in set(result_dirs):
# If suite name is specified on command line, use it.
# Otherwise try to get name from result_dir (-d on command line).
# Otherwise use the default suite name.
if parsed_args.suite:
fake_suite = parsed_args.suite
else:
result_dir_suite_name = get_suite_name(directory)
fake_suite = result_dir_suite_name or DEFAULT_SUITE_NAME
logging.info("suite name: %s", fake_suite)
test_dirs = set()
fetch_test_dirs(directory, test_dirs)
for test_dir in test_dirs:
# Uses a unique timestamp in milliseconds to fake the afe job id and
# skylab job id.
if parsed_args.bug:
_write_bug_id(test_dir, parsed_args.bug)
fake_job_id = str(int(time.time() * 1000))
job_keyval = models.job.read_keyval(test_dir)
job = parse_test_job(test_dir, job_keyval, fake_job_id, fake_suite)
if not job:
logging.warning(
"Failed to generate job.serialize file and "
"skipped the test directory: %s", test_dir)
continue
generate_job_serialize(test_dir, job, job_keyval)
print_autotest_git_history(test_dir)
if parsed_args.dry_run:
continue
# This process run is not a dry run
if not is_pubsub_sent(test_dir):
try:
upload_test_results(persistent_settings.bucket, test_dir,
job_keyval, fake_moblab_id,
fake_job_id)
send_pubsub_message(test_dir, persistent_settings.bucket,
fake_moblab_id, fake_job_id)
except Exception as e:
logging.warning(e)
continue
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
sys.exit(0)