blob: 73d2f4d8ddf750100f81fc3e231db5f75a30b038 [file] [log] [blame]
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''Utilities to summarize TKO results reported by tests in the suite.'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import collections
import contextlib
import mysql.connector
def Error(Exception):
"""Error detected in this script."""
# Row corresponds to a single row of tko_test_view_2 table in AFE DB, but
# contains only a subset of the columns in the table.
Row = collections.namedtuple(
'name, status, reason'
def get(conn, task_ids):
"""Get tko_test_view_2 Row()s for given skylab task_ids.
@param conn: A MySQL connection to TKO.
@param task_ids: list of Skylab task request IDs to collect test views for.
@return: {task_id: [Row(...)...]}
task_job_ids = _get_job_idxs_from_tko(conn, task_ids)
if not task_job_ids:
return {}
job_task_ids = {v: k for k, v in task_job_ids.iteritems()}
job_rows = _get_rows_from_tko(conn, job_task_ids.keys())
return {job_task_ids[k]: v for k, v in job_rows.iteritems()}
def filter_failed(rows):
"""Filter down given list of test_views Row() to failed tests."""
return [r for r in rows if r.status in _BAD_STATUSES]
def main():
'''Entry-point to use this script standalone.'''
parser = argparse.ArgumentParser(
description='Summarize TKO results for a Skylab task')
help='Swarming request ID for the skylab task (may be repeated)',
help='TKO host IP',
help='TKO port',
help='TKO MySQL user',
help='TKO MySQL password',
args = parser.parse_args()
if not args.task_id:
raise Error('Must request at least one --task-id')
conn = mysql.connector.connect(,
views = get(conn, args.task_id)
for task_id, rows in views.iteritems():
print('Task ID: %s' % task_id)
for row in filter_failed(rows):
print(' %s in status %s' % (, row.status))
print(' reason: %s' % (row.reason,))
def _get_rows_from_tko(conn, tko_job_ids):
"""Get a list of Row() for the given TKO job IDs.
@param conn: A MySQL connection.
@param job_ids: List of tko_job_ids to get Row()s for.
@return: {tko_job_id: [Row]}
job_rows = collections.defaultdict(list)
statuses = _get_status_map(conn)
SELECT job_idx, test_name, status_idx, reason FROM tko_test_view_2
WHERE invalid = 0 AND job_idx IN (%s)
q = _GET_TKO_TEST_VIEW_2 % ', '.join(['%s'] * len(tko_job_ids))
r = _run_query(conn, q, tko_job_ids)
for job_idx, name, s_idx, reason in r:
Row(name, statuses.get(s_idx, 'UNKNOWN'), reason))
return dict(job_rows)
def _get_job_idxs_from_tko(conn, task_ids):
"""Get tko_job_idx for given task_ids.
Task execution reports the run ID to TKO, but Skylab clients only knows the
request ID of the created task.
Swarming executes a task with increasing run IDs, retrying on bot failure.
If a task is retried after the point where TKO results are reported, this
function returns the TKO job_idx corresponding to the last completed
@param conn: MySQL connection to TKO.
@param task_ids: List of task request IDs to get TKO job IDs for.
@return {task_id: job_id}
task_runs = {}
run_ids = []
for task_id in task_ids:
# Skylab task results have used both request ID and run ID due to log
# directory naming changes. So try both. See crbug/937432.
run_ids += _run_ids_for_request(task_id)
task_runs[task_id] = list(reversed(run_ids))
run_job_idxs = _get_job_idxs_for_run_ids(conn, run_ids)
task_job_idxs = {}
for task_id, run_ids in task_runs.iteritems():
for run_id in run_ids:
if run_id in run_job_idxs:
task_job_idxs[task_id] = run_job_idxs[run_id]
return task_job_idxs
def _get_job_idxs_for_run_ids(conn, run_ids):
"""Get tko_job_idx for a given task run_ids.
@param conn: MySQL connection to TKO.
@param task_ids: List of task run IDs to get TKO job IDs for.
@return {run_id: job_id}
_GET_TKO_JOB_Q = """
SELECT task_id, tko_job_idx FROM tko_task_references
WHERE reference_type = "skylab" AND task_id IN (%s)
q = _GET_TKO_JOB_Q % ', '.join(['%s'] * len(run_ids))
job_idxs = {}
r = _run_query(conn, q, run_ids)
for run_id, tko_job_idx in r:
if run_id in job_idxs:
raise Error('task run ID %s has multiple tko references' %
job_idxs[run_id] = tko_job_idx
return job_idxs
def _get_status_map(conn):
statuses = {}
r = _run_query(conn, 'SELECT status_idx, word FROM tko_status')
for idx, word in r:
statuses[idx] = word
return statuses
def _run_ids_for_request(request_id):
"""Return Swarming run IDs for a given request ID, in ascending order."""
prefix = request_id[:len(request_id)-1]
return [prefix + i for i in ('1', '2')]
def _cursor(conn):
c = conn.cursor()
yield c
def _run_query(conn, q, args=None):
logging.debug('tko: running query %s with args %s', q, args)
with _cursor(conn) as cursor:
if args is not None:
cursor.execute(q, args)
return cursor.fetchall()
if __name__ == '__main__':