Brillo Tier 1 PTS: Add job monitoring and result processing.

Now after the PTS sequence is launched, the tier1 kickoff script
will monitor the PTS job and its child jobs until either they
complete or the host goes into a bad state.

Then the results are processed via the same ResultCollector
we use for running suites.

Also fixed the get_jobs RPC to work properly when finished=False,
before it ignored this value when set to false.

BUG=b:24774860
TEST=Ran against a local moblab. PTS is kicked off, monitored and
results are properly fetched. Links outputted by the ResultCollector
also work as expected.

Change-Id: Ie179b54aa5b380fdd78b803e8e4c922003805759
Reviewed-on: https://chromium-review.googlesource.com/305263
Commit-Ready: Simran Basi <sbasi@chromium.org>
Tested-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Simran Basi <sbasi@chromium.org>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index a322149..cb8a8ed 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -1077,7 +1077,6 @@
     all hosts have completed.
     -finished: Include only jobs for which all hosts have completed (or
     aborted).
-    At most one of these three fields should be specified.
 
     Extra type filter args for get_jobs:
     -suite: Include only jobs with child jobs.
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 9e96956..23ca47b 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -116,32 +116,27 @@
 def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
     """\
     Generate a SQL WHERE clause for job status filtering, and return it in
-    a dict of keyword args to pass to query.extra().  No more than one of
-    the parameters should be passed as True.
+    a dict of keyword args to pass to query.extra().
     * not_yet_run: all HQEs are Queued
     * finished: all HQEs are complete
     * running: everything else
     """
-    assert not ((not_yet_run and running) or
-                (not_yet_run and finished) or
-                (running and finished)), ('Cannot specify more than one '
-                                          'filter to this function')
-
+    if not (not_yet_run or running or finished):
+        return {}
     not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
                   'WHERE status != "%s")'
                   % models.HostQueueEntry.Status.QUEUED)
     not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
                     'WHERE not complete)')
 
+    where = []
     if not_yet_run:
-        where = ['id NOT IN ' + not_queued]
-    elif running:
-        where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
-    elif finished:
-        where = ['id NOT IN ' + not_finished]
-    else:
-        return {}
-    return {'where': where}
+        where.append('id NOT IN ' + not_queued)
+    if running:
+        where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
+    if finished:
+        where.append('id NOT IN ' + not_finished)
+    return {'where': [' OR '.join(['(%s)' % x for x in where])]}
 
 
 def extra_job_type_filters(extra_args, suite=False,
diff --git a/server/frontend.py b/server/frontend.py
index 122a2cb..50f69f0 100644
--- a/server/frontend.py
+++ b/server/frontend.py
@@ -775,6 +775,17 @@
         return True
 
 
+    def abort_jobs(self, jobs):
+        """Abort a list of jobs.
+
+        Already completed jobs will not be affected.
+
+        @param jobs: List of job ids to abort.
+        """
+        for job in jobs:
+            self.run('abort_host_queue_entries', job_id=job)
+
+
 class TestResults(object):
     """
     Container class used to hold the results of the tests for a job
diff --git a/server/hosts/moblab_host.py b/server/hosts/moblab_host.py
index 66af7f5..842e794 100644
--- a/server/hosts/moblab_host.py
+++ b/server/hosts/moblab_host.py
@@ -45,6 +45,9 @@
         self.afe = frontend_wrappers.RetryingAFE(timeout_min=1,
                                                  user='moblab',
                                                  server=self.hostname)
+        self.tko = frontend_wrappers.RetryingTKO(timeout_min=1,
+                                                 user='moblab',
+                                                 server=self.hostname)
         # Clear the Moblab Image Storage so that staging an image is properly
         # tested.
         self.run('rm -rf %s/*' % MOBLAB_IMAGE_STORAGE)
diff --git a/server/site_tests/brillo_Gtests/brillo_Gtests.py b/server/site_tests/brillo_Gtests/brillo_Gtests.py
index f0987bd..7dc060d 100644
--- a/server/site_tests/brillo_Gtests/brillo_Gtests.py
+++ b/server/site_tests/brillo_Gtests/brillo_Gtests.py
@@ -119,7 +119,7 @@
             logging.error('The following gTest Suites failed: \n %s',
                           '\n'.join(failed_gtestSuites))
             raise error.TestFail(
-                    '\nNot all gTest Suites completed successfully.\n'
+                    'Not all gTest Suites completed successfully.\n'
                     '%s out of %s suites failed.\n'
                     'Failed Suites: %s' % (len(failed_gtestSuites),
                                            len(gtestSuites),
diff --git a/site_utils/brillo_tier1_kickoff.py b/site_utils/brillo_tier1_kickoff.py
index 22b9d63..c036255 100755
--- a/site_utils/brillo_tier1_kickoff.py
+++ b/site_utils/brillo_tier1_kickoff.py
@@ -8,6 +8,8 @@
 import logging
 import os
 import sys
+import tempfile
+import time
 import urllib2
 
 import common
@@ -15,6 +17,7 @@
 from autotest_lib.server import hosts
 from autotest_lib.server.hosts import moblab_host
 from autotest_lib.server.cros.dynamic_suite import control_file_getter
+from autotest_lib.site_utils import run_suite
 
 
 LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
@@ -23,6 +26,10 @@
 TARGET_IMAGE_NAME = 'brillo/target'
 DEVSERVER_STAGE_URL_TEMPLATE = ('http://%(moblab)s:8080/stage?local_path='
                                 '%(staged_dir)s&artifacts=full_payload')
+AFE_JOB_PAGE_TEMPLATE = ('http://%(moblab)s/afe/#tab_id=view_job&object_id='
+                         '%(job_id)s')
+AFE_HOST_PAGE_TEMPLATE = ('http://%(moblab)s/afe/#tab_id=view_host&object_id='
+                          '%(host_id)s')
 
 
 class KickOffException(Exception):
@@ -104,16 +111,101 @@
     @param moblab: MoblabHost representing the MobLab being used to launch the
                    testing.
     @param host: Hostname of the DUT.
+
+    @returns autotest_lib.server.frontend.Job object representing the scheduled
+             job.
     """
     getter = control_file_getter.FileSystemGetter(
             [os.path.dirname(os.path.dirname(os.path.realpath(__file__)))])
     pts_controlfile_conts = getter.get_control_file_contents_by_name(
             'control.brillo_pts')
-    job_id = moblab.afe.create_job(
+    job = moblab.afe.create_job(
             pts_controlfile_conts, name='brillo pts sequence',
             control_type=control_data.CONTROL_TYPE_NAMES.SERVER,
             hosts=[host], require_ssp=False)
-    return job_id
+    logging.info('\nBrillo PTS Scheduled. Please wait for results.')
+    job_page = AFE_JOB_PAGE_TEMPLATE % dict(moblab=moblab.hostname,
+                                            job_id=job.id)
+    logging.info('Progress can be monitored at %s', job_page)
+    logging.info('Please note the main job will complete quickly, links to '
+                 'child jobs will appear shortly at the bottom on the page '
+                 '(Hit Refresh).')
+    return job
+
+
+def get_all_jobs(moblab, pts_job):
+    """Generate a list of the pts_job and it's subjobs.
+
+    @param moblab: MoblabHost representing the MobLab being used to launch the
+                   testing.
+    @param host: Hostname of the DUT.
+    @param pts_job: autotest_lib.server.frontend.Job object representing the
+                    pts job.
+
+    @returns list of autotest_lib.server.frontend.Job objects.
+    """
+    jobs_list = moblab.afe.get_jobs(id=pts_job.id)
+    jobs_list.extend(moblab.afe.get_jobs(parent_job=pts_job.id))
+    return jobs_list
+
+
+def wait_for_pts_completion(moblab, host, pts_job):
+    """Wait for the Brillo PTS job and it's subjobs to complete.
+
+    @param moblab: MoblabHost representing the MobLab being used to launch the
+                   testing.
+    @param host: Hostname of the DUT.
+    @param pts_job: autotest_lib.server.frontend.Job object representing the
+                    pts job.
+    """
+    # Wait for the sequence job and it's sub-jobs to finish, while monitoring
+    # the DUT state. As long as the DUT does not go into 'Repair Failed' the
+    # tests will complete.
+    while (moblab.afe.get_jobs(id=pts_job.id, not_yet_run=True, running=True)
+           or moblab.afe.get_jobs(parent_job=pts_job.id, not_yet_run=True,
+                                  running=True)):
+        afe_host = moblab.afe.get_hosts(hostnames=(host,))[0]
+        if afe_host.status == 'Repair Failed':
+            moblab.afe.abort_jobs(
+                [j.id for j in get_all_jobs(moblab, pts_job)])
+            host_page = AFE_HOST_PAGE_TEMPLATE % dict(moblab=moblab.hostname,
+                                                      host_id=afe_host.id)
+            raise KickOffException(
+                    'ADB dut %s has become Repair Failed. More information '
+                    'can be found at %s' % (host, host_page))
+        time.sleep(10)
+
+
+def output_results(moblab, pts_job):
+    """Output the Brillo PTS and it's subjobs results.
+
+    @param moblab: MoblabHost representing the MobLab being used to launch the
+                   testing.
+    @param pts_job: autotest_lib.server.frontend.Job object representing the
+                    pts job.
+    """
+    rc = run_suite.ResultCollector(moblab.hostname, moblab.afe, moblab.tko,
+                                   None, None, 'brillo_pts', pts_job.id,
+                                   user='moblab')
+    rc.run()
+    rc.output_results()
+
+
+def copy_results(moblab, pts_job):
+    """Copy job results locally.
+
+    @param moblab: MoblabHost representing the MobLab being used to launch the
+                   testing.
+    @param pts_job: autotest_lib.server.frontend.Job object representing the
+                    pts job.
+
+    @returns Temporary directory path.
+    """
+    tempdir = tempfile.mkdtemp(prefix='brillo_pts_results')
+    for job in get_all_jobs(moblab, pts_job):
+        moblab.get_file('/usr/local/autotest/results/%d-moblab' % job.id,
+                        tempdir)
+    return tempdir
 
 
 def main(args):
@@ -141,9 +233,17 @@
     # Stage the payload if provided.
     if args.payload:
         stage_payload(moblab, args.payload)
-    # Launch PTS.
-    sequence_jobid = schedule_pts(moblab, adb_host)
-    # TODO (sbasi): Add job monitoring, and result collection.
+    # Schedule the PTS Job.
+    sequence_job = schedule_pts(moblab, adb_host)
+    try:
+        wait_for_pts_completion(moblab, adb_host, sequence_job)
+    except KickOffException as e:
+        logging.error(e)
+        return 1
+    local_results_folder = copy_results(moblab, sequence_job)
+    output_results(moblab, sequence_job)
+    logging.info('Results have also been copied locally to %s',
+                 local_results_folder)
     return 0
 
 
diff --git a/site_utils/run_suite.py b/site_utils/run_suite.py
index 1d08f80..68e0aa4 100755
--- a/site_utils/run_suite.py
+++ b/site_utils/run_suite.py
@@ -617,7 +617,7 @@
     INFRA_TESTS = ['provision']
 
 
-    def __init__(self, view, afe_job, suite_name, build):
+    def __init__(self, view, afe_job, suite_name, build, user):
         """Init a TestView object representing a tko test view.
 
         @param view: A dictionary representing a tko test view.
@@ -626,6 +626,7 @@
         @param suite_name: The name of the suite
                            that the test belongs to.
         @param build: The build for which the test is run.
+        @param user: The user for which the test is run.
         """
         self.view = view
         self.afe_job = afe_job
@@ -634,6 +635,7 @@
         self.is_suite_view = afe_job.parent_job is None
         # This is the test name that will be shown in the output.
         self.testname = None
+        self.user = user
 
         # The case that a job was aborted before it got a chance to run
         # usually indicates suite has timed out (unless aborted by user).
@@ -871,7 +873,7 @@
         @returns: A string which looks like 135036-username
 
         """
-        return '%s-%s' % (self.view['afe_job_id'], getpass.getuser())
+        return '%s-%s' % (self.view['afe_job_id'], self.user)
 
 
     def get_bug_info(self, suite_job_keyvals):
@@ -1005,7 +1007,8 @@
 
 
     def __init__(self, instance_server, afe, tko, build, board,
-                 suite_name, suite_job_id, original_suite_name=None):
+                 suite_name, suite_job_id, original_suite_name=None,
+                 user=None):
         self._instance_server = instance_server
         self._afe = afe
         self._tko = tko
@@ -1026,6 +1029,7 @@
         self.return_message = ''
         self.is_aborted = None
         self.timings = None
+        self._user = user or getpass.getuser()
 
 
     def _fetch_relevant_test_views_of_suite(self):
@@ -1061,7 +1065,7 @@
                               afe_job_id=self._suite_job_id)
         relevant_views = []
         for v in views:
-            v = TestView(v, suite_job, self._suite_name, self._build)
+            v = TestView(v, suite_job, self._suite_name, self._build, self._user)
             if v.is_relevant_suite_view():
                 relevant_views.append(v)
         return relevant_views
@@ -1101,7 +1105,7 @@
         if child_jobs:
             self._num_child_jobs = len(child_jobs)
         for job in child_jobs:
-            views = [TestView(v, job, self._suite_name, self._build)
+            views = [TestView(v, job, self._suite_name, self._build, self._user)
                      for v in self._tko.run(
                          call='get_detailed_test_views', afe_job_id=job.id,
                          invalid=0)]
diff --git a/site_utils/run_suite_unittest.py b/site_utils/run_suite_unittest.py
index 6e2f5a4..7298e92 100755
--- a/site_utils/run_suite_unittest.py
+++ b/site_utils/run_suite_unittest.py
@@ -5,7 +5,6 @@
 
 import datetime as datetime_base
 from datetime import datetime
-import getpass
 import mock
 import time
 import unittest
@@ -209,19 +208,19 @@
         suite_job_view = run_suite.TestView(
                 self._build_view(
                     20, 'Suite prep', '----', 'GOOD', suite_job_id),
-                fake_job, suite_name, build)
+                fake_job, suite_name, build, 'chromeos-test')
         good_test = run_suite.TestView(
                 self._build_view(
                     21, 'test_Pass', 'fake/subdir', 'GOOD', 101),
-                fake_job, suite_name, build)
+                fake_job, suite_name, build, 'chromeos-test')
         bad_test = run_suite.TestView(
                 self._build_view(
                     23, 'test_Fail', 'fake/subdir', 'FAIL', 102),
-                fake_job, suite_name, build)
+                fake_job, suite_name, build, 'chromeos-test')
 
         collector = run_suite.ResultCollector(
                 'fake_server', self.afe, self.tko,
-                build, board, suite_name, suite_job_id)
+                build, board, suite_name, suite_job_id, user='chromeos-test')
         collector._suite_views = [suite_job_view]
         collector._test_views = [suite_job_view, good_test, bad_test]
         collector._max_testname_width = max(
@@ -233,7 +232,7 @@
         expected_web_links = [
                  (v.get_testname().ljust(collector._max_testname_width),
                   URL_PATTERN % ('fake_server',
-                                '%s-%s' % (v['afe_job_id'], getpass.getuser())))
+                                '%s-%s' % (v['afe_job_id'], 'chromeos-test')))
                  for v in collector._test_views]
         # Verify web links are generated correctly.
         for i in range(len(collector._web_links)):
@@ -244,7 +243,7 @@
         expected_buildbot_links = [
                  (v.get_testname().ljust(collector._max_testname_width),
                   URL_PATTERN % ('fake_server',
-                                '%s-%s' % (v['afe_job_id'], getpass.getuser())))
+                                '%s-%s' % (v['afe_job_id'], 'chromeos-test')))
                  for v in collector._test_views if v['status'] != 'GOOD']
         # Verify buildbot links are generated correctly.
         for i in range(len(collector._buildbot_links)):