blob: 93ba09cb81706d05788ff6533869de7d7874b9e7 [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import httplib, logging, os, socket, stat, time
import common
import constants as chromeos_constants, cryptohome, httpd
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
class GoogleAuthServer(object):
"""A mock Google accounts server that can be run in a separate thread
during autotests. By default, it returns happy-signals, accepting any
credentials.
"""
sid = '1234'
lsid = '5678'
token = 'aaaa'
__issue_auth_token_miss_count = 0
__token_auth_miss_count = 0
def __init__(self,
cert_path='/etc/fake_root_ca/mock_server.pem',
key_path='/etc/fake_root_ca/mock_server.key',
ssl_port=443,
port=80,
cl_responder=None,
it_responder=None,
ta_responder=None):
self._client_login = chromeos_constants.CLIENT_LOGIN_URL
self._issue_token = chromeos_constants.ISSUE_AUTH_TOKEN_URL
self._token_auth = chromeos_constants.TOKEN_AUTH_URL
self._test_over = '/webhp'
self._testServer = httpd.SecureHTTPListener(port=ssl_port,
cert_path=cert_path,
key_path=key_path)
sa = self._testServer.getsockname()
logging.info('Serving HTTPS on %s, port %s' % (sa[0], sa[1]))
if cl_responder is None:
cl_responder = self.client_login_responder
if it_responder is None:
it_responder = self.issue_token_responder
if ta_responder is None:
ta_responder = self.token_auth_responder
self._testServer.add_url_handler(self._client_login, cl_responder)
self._testServer.add_url_handler(self._issue_token, it_responder)
self._testServer.add_url_handler(self._token_auth, ta_responder)
self._client_latch = self._testServer.add_wait_url(self._client_login)
self._issue_latch = self._testServer.add_wait_url(self._issue_token)
self._testHttpServer = httpd.HTTPListener(port=port)
self._testHttpServer.add_url_handler(self._test_over,
self.__test_over_responder)
self._over_latch = self._testHttpServer.add_wait_url(self._test_over)
def run(self):
self._testServer.run()
self._testHttpServer.run()
def stop(self):
self._testServer.stop()
self._testHttpServer.stop()
def wait_for_client_login(self, timeout=10):
self._client_latch.wait(timeout)
if not self._client_latch.is_set():
raise error.TestError('Never hit ClientLogin endpoint.')
def wait_for_issue_token(self, timeout=10):
self._issue_latch.wait(timeout)
if not self._issue_latch.is_set():
self.__issue_auth_token_miss_count += 1
logging.error('Never hit IssueAuthToken endpoint.')
def wait_for_test_over(self, timeout=10):
self._over_latch.wait(timeout)
if not self._over_latch.is_set():
self.__token_auth_miss_count += 1
logging.error('Never redirected to /webhp.')
def get_endpoint_misses(self):
results = {}
if (self.__issue_auth_token_miss_count > 0):
results['issue_auth_token_miss'] =self.__issue_auth_token_miss_count
if (self.__token_auth_miss_count > 0):
results['token_auth_miss'] = self.__token_auth_miss_count
return results
def client_login_responder(self, handler, url_args):
logging.info(url_args)
handler.send_response(httplib.OK)
handler.end_headers()
handler.wfile.write('SID=%s\n' % self.sid)
handler.wfile.write('LSID=%s\n' % self.lsid)
def issue_token_responder(self, handler, url_args):
logging.info(url_args)
if url_args['service'].value != chromeos_constants.LOGIN_SERVICE:
handler.send_response(httplib.FORBIDDEN)
handler.end_headers()
handler.wfile.write(chromeos_constants.LOGIN_ERROR)
return
if not (self.sid == url_args['SID'].value and
self.lsid == url_args['LSID'].value):
raise error.TestError('IssueAuthToken called with incorrect args')
handler.send_response(httplib.OK)
handler.end_headers()
handler.wfile.write(self.token)
def token_auth_responder(self, handler, url_args):
logging.info(url_args)
if not self.token == url_args['auth'][0]:
raise error.TestError('TokenAuth called with incorrect args')
if not 'continue' in url_args:
raise error.TestError('TokenAuth called with no continue param')
handler.send_response(httplib.SEE_OTHER)
handler.send_header('Location', url_args['continue'][0])
handler.end_headers()
def __test_over_responder(self, handler, url_args):
handler.send_response(httplib.OK)
handler.end_headers()