| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import argparse |
| import logging |
| import os |
| import re |
| import shlex |
| import shutil |
| import subprocess |
| import sys |
| import time |
| import uuid |
| |
| from google.cloud import storage |
| |
| # Appends the third_party.autotest and src paths so that the script can import |
| # libraries under these paths. |
| sys.path.append('/mnt/host/source/src/platform/moblab/src') |
| |
| # pylint: disable=no-name-in-module, import-error |
| import common |
| from tko import job_serializer, models, parser_lib |
| from moblab_common import pubsub_client |
| |
| os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", |
| "%s/.service_account.json" % os.environ["HOME"]) |
| |
| CURRENT_TIMESTAMP = int(time.time()) |
| FAKE_JOB_ID = CURRENT_TIMESTAMP |
| CONTROL_FILE = "control" |
| JOB_SERIALIZE_FILE = "job.serialize" |
| STATUS_FILE = "status" |
| KEYVAL_FILE = "keyval" |
| NEW_KEYVAL_FILE = "new_keyval" |
| UPLOADED_STATUS_FILE = ".uploader_status" |
| STATUS_GOOD = "PUBSUB_SENT" |
| FAKE_MOBLAB_ID_FILE = "fake_moblab_id_do_not_delete.txt" |
| GIT_HASH_FILE = "git_hash.txt" |
| GIT_COMMAND = ("git log --pretty=format:'%h -%d %s (%ci) <%an>'" |
| " --abbrev-commit -20") |
| AUTOTEST_DIR = "/mnt/host/source/src/third_party/autotest/files/" |
| DEFAULT_SUITE_NAME = "default_suite" |
| SUITE_NAME_REGEX = "Fetching suite for suite named (.+?)\.\.\." |
| DEBUG_FILE_PATH = "debug/test_that.DEBUG" |
| |
| |
| def parse_arguments(argv): |
| """Creates the argument parser. |
| |
| Args: |
| argv: A list of input arguments. |
| |
| Returns: |
| A parser object for input arguments. |
| """ |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| "-b", |
| "--bucket", |
| type=str, |
| required=True, |
| help="The GCS bucket that test results are uploaded to, e.g." |
| "'gs://xxxx'.") |
| parser.add_argument( |
| "--bug", |
| type=_valid_bug_id, |
| required=False, |
| help= |
| "Write bug id to the test results. Each test entry can only have " |
| "at most 1 bug id. Optional.") |
| parser.add_argument("-d", |
| "--directory", |
| type=str, |
| required=True, |
| nargs='+', |
| help="The directory of non-Moblab test results.") |
| parser.add_argument( |
| "--dry_run", |
| action='store_true', |
| help="Generate job.serialize locally but do not upload test " |
| "directories and not send pubsub messages.") |
| def_logfile = "/tmp/" + os.path.basename( |
| sys.argv[0].split(".")[0]) + ".log" |
| parser.add_argument("-l", |
| "--logfile", |
| type=str, |
| required=False, |
| default=def_logfile, |
| help="Full path to logfile. Default: " + def_logfile) |
| parser.add_argument( |
| "-s", |
| "--suite", |
| type=str, |
| default=None, |
| help="The suite is used to identify the type of test results," |
| "e.g. 'power' for platform power team. If not specific, the " |
| "default value is 'default_suite'.") |
| parser.add_argument("-v", |
| "--verbose", |
| action='store_true', |
| help="Enable verbose (debug) logging.") |
| return parser.parse_args(argv) |
| |
| |
| def fetch_test_dirs(parent_dir, test_dirs): |
| """ Gets all test directories. |
| |
| Args: |
| parent_dir: The parent directory of one or multiple test directories |
| test_dirs: The output set of test directories. |
| """ |
| if not os.path.exists(parent_dir) or not os.path.isdir(parent_dir): |
| logging.warning('Test directory does not exist: %s' % parent_dir) |
| return |
| |
| control_file = os.path.join(parent_dir, CONTROL_FILE) |
| status_file = os.path.join(parent_dir, STATUS_FILE) |
| if os.path.exists(control_file) and os.path.exists(status_file): |
| test_dirs.add(parent_dir) |
| return |
| |
| for dir_name in os.listdir(parent_dir): |
| subdir = os.path.join(parent_dir, dir_name) |
| if os.path.isdir(subdir): |
| fetch_test_dirs(subdir, test_dirs) |
| |
| |
| def parse_test_job(test_dir, job_keyval, job_id, suite): |
| """ |
| Parses test results and get the job object for the given test directory. |
| The job object will be used to generate job.serialize file. |
| |
| Args: |
| test_dir: The test directory for non-moblab test results. |
| job_keyval: The key-value object of the job. |
| job_id: A job_id. |
| suite: A suite name. |
| |
| Returns: |
| The job object or None if build version not found. |
| """ |
| logging.info("Start to parse the test job for: %s", test_dir) |
| |
| # Looks up the status version and hostname |
| status_version = job_keyval.get("status_version", 0) |
| parser = parser_lib.parser(status_version) |
| |
| status_log_path = _find_status_log_path(test_dir) |
| if not status_log_path: |
| logging.warning('No status log file exists in: %s', test_dir) |
| return None |
| |
| job = parser.make_job(test_dir) |
| # workaround for CPCon pipeline, "board" is mandatory |
| job.board = job.machine_group |
| |
| job.tests = _get_job_tests(parser, job, status_log_path) |
| |
| # The job id and suite name are necessary for CPCon pipeline. Since |
| # non-moblab test results don't have these fields, the values here are fake. |
| # Set a different afe_paren_job_id so that the test won't be regarded as a |
| # suite test. |
| job.afe_job_id = job_id |
| job.afe_parent_job_id = job_id + "1" |
| job.suite = suite |
| |
| job.build_version = _get_build_version(job.tests) |
| job.build = job.build_version |
| if not job.build_version: |
| logging.warning('Failed to get build version in: %s', test_dir) |
| return None |
| |
| logging.info("Successfully parsed the job.serialize for: %s", test_dir) |
| return job |
| |
| |
| def generate_job_serialize(test_dir, job, job_keyval): |
| """ |
| Generate the job.serialize for the given test directory, job object and |
| job key-value object. |
| |
| Args: |
| test_dir: The test directory for non-moblab test results. |
| job: A job object. |
| job_keyval: The key-value object of the job. |
| """ |
| job_serialize_file = os.path.join(test_dir, JOB_SERIALIZE_FILE) |
| logging.info("Start to generate the job.serialize for: %s", |
| job_serialize_file) |
| |
| hostname = job_keyval.get('hostname', '0.0.0.0') # ip address |
| job_name = "%s-moblab/%s" % (job.afe_job_id, hostname) |
| |
| serializer = job_serializer.JobSerializer() |
| serializer.serialize_to_binary(job, job_name, job_serialize_file) |
| logging.info("Successfully generated the job.serialize for: %s", |
| job_serialize_file) |
| |
| |
| def is_pubsub_sent(test_dir): |
| """ |
| Checks if the message for the uploaded bucket has been sent. |
| |
| Args: |
| test_dir: The test directory for non-moblab test results. |
| """ |
| upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE) |
| if not os.path.exists(upload_status_file): |
| logging.debug("The upload status file %s does not exist.", |
| upload_status_file) |
| return False |
| |
| with open(upload_status_file, "r") as upload_status: |
| if upload_status.read() == STATUS_GOOD: |
| logging.warn( |
| "The test directory: %s status has already been " |
| "sent to CPCon and the .upload_status file has " |
| "been set to PUBSUB_SENT.", test_dir) |
| return True |
| else: |
| logging.debug("The pubsub message was not successful") |
| return False |
| |
| |
| def upload_test_results(args, test_dir, job_keyval, moblab_id, job_id): |
| """ |
| Upload the test directory with job.serialize to GCS bucket. |
| |
| Args: |
| args: A list of input arguments. |
| test_dir: The test directory for non-moblab test results. |
| job_keyval: The key-value object of the job. |
| moblab_id: A string that represents the unique id of a moblab device. |
| job_id: A job id. |
| """ |
| upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE) |
| with open(upload_status_file, "w") as upload_status: |
| upload_status.write("UPLOADING") |
| |
| fake_moblab_id = moblab_id |
| fake_moblab_install_id = moblab_id |
| hostname = job_keyval.get('hostname', '0.0.0.0') # ip address |
| |
| bucket = args.bucket |
| gcs_bucket_path = os.path.join("gs://%s" % bucket, "results", |
| fake_moblab_id, fake_moblab_install_id, |
| "%s-moblab" % job_id, hostname) |
| |
| try: |
| logging.info( |
| "Start to upload test directory: %s to GCS bucket path: %s", |
| test_dir, gcs_bucket_path) |
| with open(upload_status_file, "w") as upload_status: |
| upload_status.write("UPLOADED") |
| |
| cmd = "gsutil -m cp -r %s %s" % (test_dir, gcs_bucket_path) |
| subprocess.check_output(shlex.split(cmd)) |
| |
| logging.info( |
| "Successfully uploaded test directory: %s to GCS bucket path: %s", |
| test_dir, gcs_bucket_path) |
| except Exception as e: |
| with open(upload_status_file, "w") as upload_status: |
| upload_status.write("UPLOAD_FAILED") |
| raise Exception("Failed to upload test directory: %s to GCS bucket " |
| "path: %s for the error: %s" % |
| (test_dir, gcs_bucket_path, e)) |
| |
| |
| def send_pubsub_message(test_dir, bucket, moblab_id, job_id): |
| """ |
| Send pubsub messages to trigger CPCon pipeline to process non-moblab |
| test results in the specific GCS bucket path. |
| |
| Args: |
| bucket: The GCS bucket. |
| moblab_id: A moblab id. |
| job_id: A job id. |
| """ |
| moblab_install_id = moblab_id |
| console_client = pubsub_client.PubSubBasedClient() |
| gsuri = "gs://%s/results/%s/%s/%s-moblab" % (bucket, moblab_id, |
| moblab_install_id, job_id) |
| |
| try: |
| logging.info("Start to send the pubsub message to GCS path: %s", gsuri) |
| message_id = \ |
| console_client.send_test_job_offloaded_message(gsuri, |
| moblab_id, |
| moblab_install_id) |
| upload_status_file = os.path.join(test_dir, UPLOADED_STATUS_FILE) |
| with open(upload_status_file, "w") as upload_status: |
| upload_status.write(STATUS_GOOD) |
| |
| logging.info( |
| "Successfully sent the pubsub message with message id: %s to GCS " |
| "path: %s", message_id[0], gsuri) |
| except Exception as e: |
| raise Exception("Failed to send the pubsub message with moblab id: %s " |
| "and job id: %s to GCS path: %s for the error: %s" % |
| (moblab_id, job_id, gsuri, e)) |
| |
| |
| def _find_status_log_path(test_dir): |
| log_path = os.path.join(test_dir, "status.log") |
| if os.path.exists(log_path): |
| return log_path |
| |
| log_path = os.path.join(test_dir, "status") |
| if os.path.exists(log_path): |
| return log_path |
| return "" |
| |
| |
| def _get_job_tests(parser, job, status_log_path): |
| status_lines = open(status_log_path).readlines() |
| parser.start(job) |
| tests = parser.end(status_lines) |
| |
| # The parser.end can return the same object multiple times, so filter out |
| # dups. |
| return list(set([test for test in tests])) |
| |
| |
| def _get_build_version(tests): |
| release_version_label = "CHROMEOS_RELEASE_VERSION" |
| milestone_label = "CHROMEOS_RELEASE_CHROME_MILESTONE" |
| for test in tests: |
| if not test.subdir: |
| continue |
| |
| release = None |
| milestone = None |
| if release_version_label in test.attributes: |
| release = test.attributes[release_version_label] |
| if milestone_label in test.attributes: |
| milestone = test.attributes[milestone_label] |
| if release and milestone: |
| return "R%s-%s" % (milestone, release) |
| |
| return "" |
| |
| |
| def _get_partial_object_path(bucket, moblab_id): |
| storage_client = storage.Client() |
| blob_itr = storage_client.bucket(bucket).list_blobs(prefix=moblab_id) |
| |
| for blob in blob_itr: |
| if ("job.serialize" in blob.name and "moblab" in blob.name): |
| yield blob.name |
| |
| |
| def _confirm_option(question): |
| """ |
| Get a yes/no answer from the user via command line. |
| |
| Args: |
| question: string, question to ask the user. |
| |
| Returns: |
| A boolean. True if yes; False if no. |
| """ |
| expected_answers = ['y', 'yes', 'n', 'no'] |
| answer = '' |
| while answer not in expected_answers: |
| answer = input(question + "(y/n): ").lower().strip() |
| return answer[0] == "y" |
| |
| |
| def _write_bug_id(test_dir, bug_id): |
| """ |
| Write the bug id to the test results. |
| |
| Args: |
| test_dir: The test directory for non-moblab test results. |
| bug_id: The bug id to write to the test results. |
| Returns: |
| A boolean. True if the bug id is written successfully or is the same as |
| the old bug id already in test results; False if failed to write the |
| bug id, or if the user decides not to overwrite the old bug id already |
| in test results. |
| """ |
| old_bug_id = None |
| new_keyval = list() |
| |
| keyval_file = os.path.join(test_dir, KEYVAL_FILE) |
| try: |
| with open(keyval_file, 'r') as keyval_raw: |
| for line in keyval_raw.readlines(): |
| match = re.match(r'bug_id=(\d+)', line) |
| if match: |
| old_bug_id = _valid_bug_id(match.group(1)) |
| else: |
| new_keyval.append(line) |
| except IOError as e: |
| logging.error( |
| 'Cannot read keyval file from %s, skip writing the bug ' |
| 'id %s: %s', test_dir, bug_id, e) |
| return False |
| |
| if old_bug_id: |
| if old_bug_id == bug_id: |
| return True |
| overwrite_bug_id = _confirm_option( |
| 'Would you like to overwrite bug id ' |
| '%s with new bug id %s?' % (old_bug_id, bug_id)) |
| if not overwrite_bug_id: |
| return False |
| |
| new_keyval.append('bug_id=%s' % bug_id) |
| new_keyval_file = os.path.join(test_dir, NEW_KEYVAL_FILE) |
| try: |
| with open(new_keyval_file, 'w') as new_keyval_raw: |
| for line in new_keyval: |
| new_keyval_raw.write(line) |
| new_keyval_raw.write('\n') |
| shutil.move(new_keyval_file, keyval_file) |
| return True |
| except Exception as e: |
| logging.error( |
| 'Cannot write bug id to keyval file in %s, skip writing ' |
| 'the bug id %s: %s', test_dir, bug_id, e) |
| return False |
| |
| |
| def _valid_bug_id(v): |
| """Check if user input bug id is in valid format. |
| |
| Args: |
| v: User input bug id in string. |
| Returns: |
| An int representing the bug id. |
| Raises: |
| argparse.ArgumentTypeError: if user input bug id has wrong format. |
| """ |
| try: |
| bug_id = int(v) |
| except ValueError as e: |
| raise argparse.ArgumentTypeError( |
| "Bug id %s is not a positive integer: " |
| "%s" % (v, e)) |
| if bug_id <= 0: |
| raise argparse.ArgumentTypeError( |
| "Bug id %s is not a positive integer" % v) |
| return bug_id |
| |
| |
| def _get_fake_moblab_id(): |
| """Get or generate a fake moblab id. |
| |
| Moblab id is the unique id to a moblab device. Since the upload script runs |
| from the chroot instead of a moblab device, we need to generate a fake |
| moblab id to comply with the CPCon backend. If there is a previously saved |
| fake moblab id, read and use it. Otherwise, generate a uuid to fake a moblab |
| device, and store it in the same directory as the upload script. |
| |
| Returns: |
| A string representing a fake moblab id. |
| """ |
| script_dir = os.path.realpath( |
| os.path.join(os.getcwd(), os.path.dirname(__file__))) |
| fake_moblab_id_path = os.path.join(script_dir, FAKE_MOBLAB_ID_FILE) |
| try: |
| with open(fake_moblab_id_path, "r") as fake_moblab_id_file: |
| fake_moblab_id = str(fake_moblab_id_file.read())[0:32] |
| if fake_moblab_id: |
| return fake_moblab_id |
| except IOError as e: |
| logging.info('Cannot find a fake moblab id at %s, creating a new one.', |
| fake_moblab_id_path) |
| fake_moblab_id = uuid.uuid4().hex |
| try: |
| with open(fake_moblab_id_path, "w") as fake_moblab_id_file: |
| fake_moblab_id_file.write(fake_moblab_id) |
| except IOError as e: |
| logging.warning('Unable to write the fake moblab id to %s: %s', |
| fake_moblab_id_path, e) |
| return fake_moblab_id |
| |
| |
| def print_autotest_git_history(test_dir): |
| """ |
| Print the hash of the latest git commit of the autotest directory. |
| |
| Args: |
| test_dir: The test directory for non-moblab test results. |
| """ |
| git_hash = subprocess.check_output(shlex.split(GIT_COMMAND), |
| cwd=AUTOTEST_DIR) |
| git_hash_path = os.path.join(test_dir, GIT_HASH_FILE) |
| with open(git_hash_path, "w") as git_hash_file: |
| git_hash_file.write(git_hash.decode("utf-8")) |
| |
| |
| def get_suite_name(results_dir): |
| """Get the suite name from a results directory. |
| |
| If we don't find the suite name in the first ten lines of test_that.DEBUG |
| then return None. |
| |
| Args: |
| results_dir: The directory specified on the command line. |
| """ |
| debug_file = os.path.join(results_dir, DEBUG_FILE_PATH) |
| if not os.path.exists(debug_file) or not os.path.isfile(debug_file): |
| return None |
| exp = re.compile(SUITE_NAME_REGEX) |
| try: |
| with open(debug_file) as f: |
| line_count = 0 |
| for line in f: |
| line_count += 1 |
| if line_count > 10: |
| break |
| result = exp.search(line) |
| if not result: |
| continue |
| else: |
| return result.group(1) |
| except IOError as e: |
| logging.warning('Error trying to read test_that.DEBUG: %s', e) |
| return None |
| |
| |
| def main(args): |
| parsed_args = parse_arguments(args) |
| bucket = parsed_args.bucket |
| |
| logger = logging.getLogger() |
| fmt = logging.Formatter('%(asctime)s :: %(levelname)-8s :: %(message)s') |
| |
| log_level = logging.INFO |
| if parsed_args.verbose: |
| log_level = logging.DEBUG |
| |
| # modify existing handlers |
| for handler in logger.handlers: |
| handler.setFormatter(fmt) |
| handler.setLevel(log_level) |
| |
| logging.info("logging to %s", parsed_args.logfile) |
| |
| hfile = logging.FileHandler(parsed_args.logfile, mode='w') |
| hfile.setFormatter(fmt) |
| hfile.setLevel(log_level) |
| logger.addHandler(hfile) |
| |
| # The non-moblab test results generated by test_that CLI don't have moblab |
| # id, moblab install id, suite name and job id. Thus, we need to fake these |
| # fields with valid values. |
| fake_moblab_id = _get_fake_moblab_id() |
| |
| result_dirs = map(os.path.normpath, parsed_args.directory) |
| for directory in set(result_dirs): |
| # If suite name is specified on command line, use it. |
| # Otherwise try to get name from result_dir (-d on command line). |
| # Otherwise use the default suite name. |
| if parsed_args.suite: |
| fake_suite = parsed_args.suite |
| else: |
| result_dir_suite_name = get_suite_name(directory) |
| fake_suite = result_dir_suite_name or DEFAULT_SUITE_NAME |
| logging.info("suite name: %s", fake_suite) |
| |
| test_dirs = set() |
| fetch_test_dirs(directory, test_dirs) |
| for test_dir in test_dirs: |
| # Uses a unique timestamp in milliseconds to fake the afe job id and |
| # skylab job id. |
| if parsed_args.bug: |
| _write_bug_id(test_dir, parsed_args.bug) |
| fake_job_id = str(int(time.time() * 1000)) |
| job_keyval = models.job.read_keyval(test_dir) |
| job = parse_test_job(test_dir, job_keyval, fake_job_id, fake_suite) |
| |
| if not job: |
| logging.warning( |
| "Failed to generate job.serialize file and " |
| "skipped the test directory: %s", test_dir) |
| continue |
| |
| generate_job_serialize(test_dir, job, job_keyval) |
| |
| print_autotest_git_history(test_dir) |
| |
| if parsed_args.dry_run: |
| continue |
| |
| # This process run is not a dry run |
| if not is_pubsub_sent(test_dir): |
| try: |
| upload_test_results(parsed_args, test_dir, job_keyval, |
| fake_moblab_id, fake_job_id) |
| send_pubsub_message(test_dir, bucket, fake_moblab_id, |
| fake_job_id) |
| except Exception as e: |
| logging.warning(e) |
| continue |
| |
| |
| if __name__ == "__main__": |
| try: |
| main(sys.argv[1:]) |
| except KeyboardInterrupt: |
| sys.exit(0) |