blob: 1592dcca0be790e09a5eb467adf696068ce6b91e [file] [log] [blame] [edit]
"""A simple script to backfill tko_task_references table with throttling."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import collections
import contextlib
import logging
import time
import MySQLdb
class BackfillException(Exception):
pass
def _parse_args():
parser = argparse.ArgumentParser(
description=__doc__)
parser.add_argument('--host', required=True, help='mysql server host')
parser.add_argument('--user', required=True, help='mysql server user')
parser.add_argument('--password', required=True, help='mysql server password')
parser.add_argument('--dryrun', action='store_true', default=False)
parser.add_argument(
'--num-iterations',
default=None,
type=int,
help='If set, total number of iterations. Default is no limit.',
)
parser.add_argument(
'--batch-size',
default=1000,
help='Number of tko_jobs rows to read in one iteration',
)
parser.add_argument(
'--sleep-seconds',
type=int,
default=1,
help='Time to sleep between iterations',
)
args = parser.parse_args()
if args.dryrun:
if not args.num_iterations:
logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
args.num_iterations = 5
return args
@contextlib.contextmanager
def _mysql_connection(args):
conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
with _mysql_cursor(conn) as c:
c.execute('USE chromeos_autotest_db;')
try:
yield conn
finally:
conn.close()
@contextlib.contextmanager
def _autocommit(conn):
try:
yield conn
except:
conn.rollback()
else:
conn.commit()
@contextlib.contextmanager
def _mysql_cursor(conn):
c = conn.cursor()
try:
yield c
finally:
c.close()
def _latest_unfilled_job_idx(conn):
with _mysql_cursor(conn) as c:
c.execute("""
SELECT tko_job_idx
FROM tko_task_references
ORDER BY tko_job_idx
LIMIT 1
;""")
r = c.fetchall()
if r:
return str(long(r[0][0]) - 1)
logging.debug('tko_task_references is empty.'
' Grabbing the latest tko_job_idx to fill.')
with _mysql_cursor(conn) as c:
c.execute("""
SELECT job_idx
FROM tko_jobs
ORDER BY job_idx DESC
LIMIT 1
;""")
r = c.fetchall()
if r:
return r[0][0]
return None
_TKOTaskReference = collections.namedtuple(
'_TKOTaskReference',
['tko_job_idx', 'task_reference', 'parent_task_reference'],
)
_SQL_SELECT_TASK_REFERENCES = """
SELECT job_idx, afe_job_id, afe_parent_job_id
FROM tko_jobs
WHERE job_idx <= %(latest_job_idx)s
ORDER BY job_idx DESC
LIMIT %(batch_size)s
;"""
_SQL_INSERT_TASK_REFERENCES = """
INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
VALUES %(values)s
;"""
_SQL_SELECT_TASK_REFERENCE = """
SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
;"""
def _compute_task_references(conn, latest_job_idx, batch_size):
with _mysql_cursor(conn) as c:
sql = _SQL_SELECT_TASK_REFERENCES % {
'latest_job_idx': latest_job_idx,
'batch_size': batch_size,
}
c.execute(sql)
rs = c.fetchall()
if rs is None:
return []
return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
def _insert_task_references(conn, task_references, dryrun):
values = ', '.join([
'("afe", %s, "%s", "%s")' %
(tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
for tr in task_references
])
sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
if dryrun:
if len(sql) < 200:
sql_log = sql
else:
sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
logging.debug('Would have run: %s', sql_log)
with _autocommit(conn) as conn:
with _mysql_cursor(conn) as c:
c.execute(sql)
def _verify_task_references(conn, task_references):
# Just verify that the last one was inserted.
if not task_references:
return
tko_job_idx = task_references[-1].tko_job_idx
sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
with _mysql_cursor(conn) as c:
c.execute(sql)
r = c.fetchall()
if not r or r[0][0] != tko_job_idx:
raise BackfillException(
'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
def _next_job_idx(task_references):
return str(long(task_references[-1].tko_job_idx) - 1)
def main():
logging.basicConfig(level=logging.DEBUG)
args = _parse_args()
with _mysql_connection(args) as conn:
tko_job_idx = _latest_unfilled_job_idx(conn)
if tko_job_idx is None:
raise BackfillException('Failed to get last unfilled tko_job_idx')
logging.info('First tko_job_idx to fill: %s', tko_job_idx)
while True:
logging.info('####################################')
logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
task_references = ()
with _mysql_connection(args) as conn:
task_references = _compute_task_references(
conn, tko_job_idx, args.batch_size)
if not task_references:
logging.info('No more unfilled task references. All done!')
break
logging.info(
'Inserting %d task references. tko_job_ids: %d...%d',
len(task_references),
task_references[0].tko_job_idx,
task_references[-1].tko_job_idx,
)
with _mysql_connection(args) as conn:
_insert_task_references(conn, task_references, args.dryrun)
if not args.dryrun:
with _mysql_connection(args) as conn:
_verify_task_references(conn, task_references)
tko_job_idx = _next_job_idx(task_references)
if args.num_iterations is not None:
args.num_iterations -= 1
if args.num_iterations <= 0:
break
logging.info('%d more iterations left', args.num_iterations)
logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
time.sleep(args.sleep_seconds)
if __name__ == '__main__':
main()