blob: 2c07dc35479cfb2cf283b5750b5dde9a32456f1f [file] [log] [blame]
#!/usr/bin/python
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Script to sync server_db with Skylab inventory service.
This script is part of migration of autotest infrastructure services to skylab.
It is intended to run regularly in the autotest lab and:
- obtain the server_db information exposed by the skylab inventory service
- (for now) compare with the server_db information in the main AFE database
and create monarch metrics if any discrepancies are found.
- (in the future) inject this information into the server_db in master AFE
database.
"""
import collections
import contextlib
import logging
import optparse
import sys
import subprocess
import yaml
import common
import MySQLdb
from autotest_lib.client.common_lib import global_config
from chromite.lib import metrics
from chromite.lib import ts_mon_config
# Database connection variables.
DEFAULT_USER = global_config.global_config.get_config_value(
'CROS', 'db_backup_user', type=str, default='')
DEFAULT_PASSWD = global_config.global_config.get_config_value(
'CROS', 'db_backup_password', type=str, default='')
DB = 'chromeos_lab_servers'
Server = collections.namedtuple('Server',
['hostname', 'cname', 'status', 'note'])
ServerAttribute = collections.namedtuple('ServerAttribute',
['hostname', 'attribute', 'value'])
ServerRole = collections.namedtuple('ServerRole',
['hostname', 'role'])
# Metrics
_METRICS_PREFIX = 'chromeos/autotest/skylab_migration/server_db'
# API
API_ROOT = 'https://inventory-dot-chromeos-skylab.googleplex.com/_ah/api'
API = 'infrastructure'
VERSION = 'v1'
SERVER_LIST = 'servers'
class SyncUpExpection(Exception):
"""Raised when failed to sync up server db."""
pass
class UpdateDatabaseException(Exception):
"""Raised when failed to execute any database update mysql command."""
pass
def server_db_dump(cursor):
"""Dump the server db in to dict.
@param cursor: A mysql cursor object to the server db.
@returns: a dict of servers, server_attributes, server_roles namedtuples list
"""
cursor.execute('SELECT hostname, cname, status, note FROM servers')
servers = map(Server._make, cursor.fetchall())
cursor.execute('SELECT hostname, attribute, value FROM servers t1 JOIN '
'server_attributes t2 ON t1.id=t2.server_id')
server_attrs = map(ServerAttribute._make, cursor.fetchall())
cursor.execute('SELECT hostname, role FROM servers t1 JOIN '
'server_roles t2 ON t1.id=t2.server_id')
server_roles = map(ServerRole._make, cursor.fetchall())
db_output = {'servers': servers,
'server_attributes': server_attrs,
'server_roles': server_roles}
return db_output
def inventory_server_list():
"""Get the response from inventory server list API.
@returns: the response in String format.
"""
api_url = '%s/%s/%s/%s' % (API_ROOT, API, VERSION, SERVER_LIST)
result = subprocess.check_output(['sso_client', '--url', api_url],
stderr=subprocess.STDOUT)
return result
def inventory_api_response_parse(response_str, environment):
"""Parse the response from inventory API to namedtuples.
@param response_str: the String format of the response.
@environment: the lab environment of the servers, prod or staging.
@returns: a dict of servers, server_attrs, server_roles namedtuples list.
"""
summaries = yaml.load(response_str)['servers']
# Parse server tuples, replace notes with note in summaries
summaries = [s for s in summaries
if s['environment'].lower() == environment.lower()]
servers = []
for d in summaries:
d['note'] = d['notes']
del d['notes']
sub_dict_for_server = {k:d[k].lower() for k in Server._fields}
# cname has unique constraint in DB, empty value should be set to None.
if not sub_dict_for_server['cname']:
sub_dict_for_server['cname'] = None
servers.append(Server(**sub_dict_for_server))
# Parse server_attrs tuples
server_attrs = []
for nest_dict in summaries:
hostname = nest_dict['hostname']
for k, v in nest_dict.get('attributes', {}).iteritems():
# Skip the entry whose attribute's value is null
if v:
flat_dict = {'hostname':hostname, 'attribute':k, 'value':v.lower()}
server_attrs.append(ServerAttribute(**flat_dict))
# Parse server_roles tuples
server_roles = []
for nest_dict in summaries:
hostname = nest_dict['hostname']
for role in nest_dict['roles']:
flat_dict = {'hostname': hostname, 'role':role.lower()}
server_roles.append(ServerRole(**flat_dict))
api_output = {'servers': servers,
'server_attributes': server_attrs,
'server_roles': server_roles}
return api_output
def create_mysql_updates(api_output, db_output, table,
server_id_map, warn_only):
"""Sync up servers table in server db with the inventory service.
First step, entries in server_db but not in inventory services will be deleted
from db. Then, entries in inventory service but not in server_db will be
inserted into server_db.
@param api_output: a dict mapping table name to list of corresponding
namedtuples parsed from inventory. This is the only
source of truth.
@param db_output: a dict mapping table name to list of corresponding
namedtuples parsed from server db.
@param table: name of the targeted server_db table.
@param server_id_map: server hostname to id mapping dict.
@param warn_only: whether it is warn_only. If yes, there will be no server id
for server_attributes and server_roles.
@returns a list of mysql update commands, e.g.
['DELETE FROM a WHERE xx', 'INSERT ...']
"""
logging.info('Checking table %s with inventory service...', table)
mysql_cmds = []
delete_entries = set(db_output[table]) - set(api_output[table])
insert_entries = set(api_output[table]) - set(db_output[table])
if delete_entries:
logging.info('\nTable %s is not synced up! Below is a list of entries '
'that exist only in server db. These invalid entries will be '
'deleted from server db:\n%s', table, delete_entries)
for entry in delete_entries:
if table == 'servers':
cmd = 'DELETE FROM servers WHERE hostname=%r' % entry.hostname
elif table == 'server_attributes':
cmd = ('DELETE FROM server_attrs WHERE server_id=%d and attribute=%r' %
(server_id_map[entry.hostname], entry.attribute))
else:
cmd = ('DELETE FROM server_roles WHERE server_id=%d and role=%r' %
(server_id_map[entry.hostname], entry.role))
mysql_cmds.append(cmd)
if insert_entries:
logging.info('\nTable %s is not synced up! Below is a list of entries '
'that exist only in inventory service. These new entries will'
' be inserted in to server db:\n%s', table, insert_entries)
for entry in insert_entries:
# If this is warn_only, it is very likely that the server id for new
# entry does not exsit since the server has not been inserted into servers
# table yet. For this case, fake it as 0.
if warn_only and not server_id_map.get(entry.hostname):
server_id = 0
else:
server_id = server_id_map[entry.hostname]
if table == 'servers':
cname = entry.cname.__repr__() if entry.cname else 'NULL'
cmd = ('INSERT INTO servers (hostname, cname, status, note) '
'VALUES(%r, %s, %r, %r)' % (entry.hostname,
cname,
entry.status,
entry.note))
elif table == 'server_attributes':
cmd = ('INSERT INTO server_attributes (server_id, attribute, value) '
'VALUES(%d, %r, %r)' % (server_id,
entry.attribute,
entry.value))
else:
cmd = ('INSERT INTO server_roles (server_id, role) VALUES(%d, %r)' %
(server_id, entry.role))
mysql_cmds.append(cmd)
return mysql_cmds
def parse_options():
"""Parse the command line arguments."""
usage = 'usage: %prog [options]'
parser = optparse.OptionParser(usage=usage)
parser.add_option('-o', '--host', default='localhost',
help='host of the Autotest DB. Default is localhost.')
parser.add_option('-u', '--user', default=DEFAULT_USER,
help='User to login to the Autotest DB. Default is the '
'one defined in config file.')
parser.add_option('-p', '--password', default=DEFAULT_PASSWD,
help='Password to login to the Autotest DB. Default is '
'the one defined in config file.')
parser.add_option('-w', '--warn_only', action='store_true', default=False,
help='Only raise warnings if server_db is inconsistent '
'with inventory service. Do not attempt to update '
'server_db to match inventory service.')
parser.add_option('-e', '--environment', default='prod',
help='Environment of the server_db, prod or staging. '
'Default is prod')
options, args = parser.parse_args()
return parser, options, args
def verify_options_and_args(options, args):
"""Verify the validity of options and args.
@param options: The parsed options to verify.
@param args: The parsed args to verify.
@returns: True if verification passes, False otherwise.
"""
if args:
logging.error('Unknown arguments: ' + str(args))
return False
if not (options.user and options.password):
logging.error('Failed to get the default user of password for Autotest'
' DB. Please specify them through the command line.')
return False
return True
def _modify_table(cursor, mysql_cmds, table):
"""Helper method to commit a list of sql_cmds.
@param cursor: mysql cursor instance.
@param mysql_cmds: the list of sql cmd to be executed.
@param table: the name of the table modified.
"""
try:
succeed = False
for cmd in mysql_cmds:
logging.info('running command: %s', cmd)
cursor.execute(cmd)
succeed = True
except Exception as e:
msg = ('Fail to run the following sql command:\n%s\nError:\n%s\n'
'All changes made to server db will be rollback.' %
(cmd, e))
logging.error(msg)
raise UpdateDatabaseException(msg)
finally:
num_deletes = len([cmd.startswith('DELETE') for cmd in mysql_cmds])
num_inserts = len([cmd.startswith('INSERT') for cmd in mysql_cmds])
metrics.Counter(_METRICS_PREFIX + '/deletion').increment_by(
num_deletes, fields={'table': table, 'succeed': succeed})
metrics.Counter(_METRICS_PREFIX + '/inserts').increment_by(
num_inserts, fields={'table': table, 'succeed': succeed})
@contextlib.contextmanager
def _msql_connection_with_transaction(options):
"""mysql connection helper.
@param options: parsed command line options.
"""
conn = MySQLdb.connect(options.host, options.user, options.password, DB)
try:
yield conn
except:
conn.rollback()
raise
else:
conn.commit()
finally:
conn.close()
@contextlib.contextmanager
def _cursor(conn):
"""mysql connect cursor helper.
@param conn: a Mysql db connection instance.
"""
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()
def _main(options):
"""Main entry.
@param args: parsed command line arguments.
"""
response_str = inventory_server_list()
skylab_server_data = inventory_api_response_parse(response_str,
options.environment)
with _msql_connection_with_transaction(options) as conn:
with _cursor(conn) as cursor:
db_output = server_db_dump(cursor)
# Update servers table first, since it will cause server_id change. It
# also delete entries in server_attributes and server_roles
# associated with that deleted server.
for table in ['servers', 'server_attributes', 'server_roles']:
cursor.execute('SELECT id, hostname FROM servers')
server_id_map = {row[1]:row[0] for row in cursor.fetchall()}
mysql_cmds = create_mysql_updates(skylab_server_data,
db_output,
table,
server_id_map,
options.warn_only)
if not options.warn_only:
logging.info('Start updating table %s', table)
_modify_table(cursor, mysql_cmds, table)
logging.info('Successfully synced table %s with inventory service',
table)
# Since there exist cascade deletion in servers table, dump db_output
# again after syncing up servers table.
if table == 'servers':
db_output = server_db_dump(cursor)
def main(argv):
"""Entry point."""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(name)s - " +
"%(levelname)s - %(message)s")
parser, options, args = parse_options()
sync_succeed = False
if not verify_options_and_args(options, args):
parser.print_help()
sys.exit(1)
with ts_mon_config.SetupTsMonGlobalState(service_name='sync_server_db',
indirect=True):
try:
_main(options)
sync_succeed = True
except:
raise
finally:
metrics.Counter(_METRICS_PREFIX + '/tick').increment(
fields={'success': sync_succeed})
if __name__ == '__main__':
main(sys.argv)