blob: 5024723128101881441f44109c8120583372f3ae [file] [log] [blame]
#!/usr/bin/python2.6
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Validate or replace the standard gdata authorization token."""
import filecmp
import optparse
import os
import shutil
import sys
from chromite.buildbot import constants
from chromite.lib import cros_build_lib as build_lib
from chromite.lib import operation
MODULE = os.path.splitext(os.path.basename(__file__))[0]
oper = operation.Operation(MODULE)
TOKEN_FILE = os.path.join(os.environ['HOME'], '.gdata_token')
CRED_FILE = os.path.join(os.environ['HOME'], '.gdata_cred.txt')
def _ChrootPathToExternalPath(path):
"""Translate |path| inside chroot to external path to same location."""
if path:
return os.path.join(constants.SOURCE_ROOT,
constants.DEFAULT_CHROOT_DIR,
path.lstrip('/'))
return None
class OutsideChroot(object):
"""Class for managing functionality when run outside chroot."""
def __init__(self, args):
self.args = args
def Run(self):
"""Re-start |args| inside chroot and copy out auth file."""
# Note that enter_chroot (cros_sdk) will automatically copy both
# the token file and the cred file into the chroot, so no need
# to do that here.
# Rerun the same command that launched this run inside the chroot.
cmd = [MODULE] + self.args
result = build_lib.RunCommand(cmd, enter_chroot=True,
print_cmd=False, error_code_ok=True)
if result.returncode != 0:
oper.Die('Token validation failed, exit code was %r.' %
result.returncode)
# Copy the token file back from chroot if different.
chroot_token_file = _ChrootPathToExternalPath(TOKEN_FILE)
if not os.path.exists(chroot_token_file):
oper.Die('No token file generated inside chroot.')
elif (not os.path.exists(TOKEN_FILE) or not
filecmp.cmp(TOKEN_FILE, chroot_token_file)):
oper.Notice('Copying new token file from chroot to %r' % TOKEN_FILE)
shutil.copy2(chroot_token_file, TOKEN_FILE)
else:
oper.Notice('No change in token file.')
class InsideChroot(object):
"""Class for managing functionality when run inside chroot.
Note that some additional imports happen within code in this class
because those imports are only available inside the chroot.
"""
def __init__(self):
self.creds = None # gdata_lib.Creds object.
self.gd_client = None # For interacting with Google Docs.
self.it_client = None # For interacting with Issue Tracker.
def _LoadTokenFile(self):
"""Load existing auth token file."""
if not os.path.exists(TOKEN_FILE):
oper.Warning('No current token file at %r.' % TOKEN_FILE)
return False
# Load token file, if it exists.
self.creds.LoadAuthToken(TOKEN_FILE)
return True
def _SaveTokenFile(self):
"""Save to auth toke file if anything changed."""
self.creds.StoreAuthTokenIfNeeded(TOKEN_FILE)
def _ValidateDocsToken(self):
"""Validate the existing Docs token."""
# pylint: disable=W0404
import gdata.service
if not self.creds.docs_auth_token:
return False
oper.Notice('Attempting to log into Docs using auth token.')
self.gd_client.source = 'Package Status'
self.gd_client.SetClientLoginToken(self.creds.docs_auth_token)
try:
# Try to access generic spreadsheets feed, which will check access.
self.gd_client.GetSpreadsheetsFeed()
# Token accepted. We're done here.
oper.Notice('Docs token validated.')
return True
except gdata.service.RequestError as ex:
reason = ex[0]['reason']
if reason == 'Token expired':
return False
raise
def _GenerateDocsToken(self):
"""Generate a new Docs token from credentials."""
# pylint: disable=W0404
import gdata.service
oper.Warning('Docs token not valid. Will try to generate a new one.')
self.creds.LoadCreds(CRED_FILE)
self.gd_client.email = self.creds.user
self.gd_client.password = self.creds.password
try:
self.gd_client.ProgrammaticLogin()
self.creds.SetDocsAuthToken(self.gd_client.GetClientLoginToken())
oper.Notice('New Docs token generated.')
return True
except gdata.service.BadAuthentication:
oper.Error('Credentials from %r not accepted.'
' Unable to generate new Docs token.' % CRED_FILE)
return False
def _ValidateTrackerToken(self):
"""Validate the existing Tracker token."""
# pylint: disable=W0404
import gdata.gauth
import gdata.projecthosting.client
if not self.creds.tracker_auth_token:
return False
oper.Notice('Attempting to log into Tracker using auth token.')
self.it_client.source = 'Package Status'
self.it_client.auth_token = gdata.gauth.ClientLoginToken(
self.creds.tracker_auth_token)
try:
# Try to access Tracker Issue #1, which will check access.
query = gdata.projecthosting.client.Query(issue_id='1')
self.it_client.get_issues('chromium-os', query=query)
# Token accepted. We're done here.
oper.Notice('Tracker token validated.')
return True
except gdata.client.Error:
# Exception is gdata.client.Unauthorized in the case of bad token, but
# I do not know what the error is for an expired token so I do not
# want to limit the catching here. All the errors for gdata.client
# functionality extend gdata.client.Error (I do not see one that is
# obviously about an expired token).
return False
def _GenerateTrackerToken(self):
"""Generate a new Tracker token from credentials."""
# pylint: disable=W0404
import gdata.client
oper.Warning('Tracker token not valid. Will try to generate a new one.')
self.creds.LoadCreds(CRED_FILE)
try:
self.it_client.ClientLogin(self.creds.user, self.creds.password,
source='Package Status', service='code',
account_type='GOOGLE')
self.creds.SetTrackerAuthToken(self.it_client.auth_token.token_string)
oper.Notice('New Tracker token generated.')
return True
except gdata.client.BadAuthentication:
oper.Error('Credentials from %r not accepted.'
' Unable to generate new Tracker token.' % CRED_FILE)
return False
def Run(self):
"""Validate existing auth token or generate new one from credentials."""
# pylint: disable=W0404
import chromite.lib.gdata_lib as gdata_lib
import gdata.spreadsheet.service
self.creds = gdata_lib.Creds()
self.gd_client = gdata.spreadsheet.service.SpreadsheetsService()
self.it_client = gdata.projecthosting.client.ProjectHostingClient()
self._LoadTokenFile()
if not self._ValidateTrackerToken():
if not self._GenerateTrackerToken():
oper.Die('Failed to validate or generate Tracker token.')
if not self._ValidateDocsToken():
if not self._GenerateDocsToken():
oper.Die('Failed to validate or generate Docs token.')
self._SaveTokenFile()
def _CreateParser():
usage = 'Usage: %prog'
epilog = ('\n'
'Run outside of chroot to validate the gdata '
'token file at %r or update it if it has expired.\n'
'To update the token file there must be a valid '
'credentials file at %r.\n'
'If run inside chroot the updated token file is '
'still valid but will not be preserved if chroot\n'
'is deleted.\n' %
(TOKEN_FILE, CRED_FILE))
return optparse.OptionParser(usage=usage, epilog=epilog)
def main(args):
"""Main function."""
# Create a copy of args just to be safe.
args = list(args)
# No actual options used, but --help is still supported.
parser = _CreateParser()
(_options, args) = parser.parse_args(args)
if args:
parser.print_help()
oper.Die('No arguments allowed.')
if build_lib.IsInsideChroot():
InsideChroot().Run()
else:
OutsideChroot(args).Run()