blob: 08b44d257020911eb0af2667fc1ce6b941ba0126 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.;
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# This is an integration test which ensures that a proxy set on a
# shared network connection is exposed via LibCrosSevice and used
# by tlsdated during time synchronization.
import dbus
import gobject
import logging
import subprocess
import threading
import time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ui
from autotest_lib.client.cros.networking import shill_proxy
from dbus.mainloop.glib import DBusGMainLoop
from SocketServer import ThreadingTCPServer, StreamRequestHandler
class ProxyHandler(StreamRequestHandler):
"""Matching request handler for the ThreadedHitServer
that notes when an expected request is seen.
"""
wbufsize = -1
def handle(self):
"""Reads the first line, up to 40 characters, looking
for the CONNECT string that tlsdated sends. If it
is found, the server's hit() method is called.
All requests receive a HTTP 504 error.
"""
# Read up to 40 characters
data = self.rfile.readline(40).strip()
logging.info('ProxyHandler::handle(): <%s>', data)
# TODO(wad) Add User-agent check when it lands in tlsdate.
# Also, abstract the time server and move this code into cros/.
if data.__contains__('CONNECT clients3.google.com:443 HTTP/1.1'):
self.server.hit()
self.wfile.write("HTTP/1.1 504 Gateway Timeout\r\n" +
"Connection: close\r\n\r\n")
class ThreadedHitServer(ThreadingTCPServer):
"""A threaded TCP server which services requests
and allows the handler to track "hits".
"""
def __init__(self, server_address, HandlerClass):
"""Constructor
@param server_address: tuple of server IP and port to listen on.
@param HandlerClass: the RequestHandler class to instantiate per req.
"""
self._hits = 0
ThreadingTCPServer.__init__(self, server_address, HandlerClass)
def hit(self):
"""Increment the hit count. Usually called by the HandlerClass"""
self._hits += 1
def reset_hits(self):
"""Set the hit count to 0"""
self._hits = 0
def hits(self):
"""Get the number of matched requests
@return the count of matched requests
"""
return self._hits
class ProxyListener(object):
"""A fake listener for tracking if an expected CONNECT request is
seen at the provided server address. Any hits are exposed to be
consumed by the caller.
"""
def __init__(self, server_address):
"""Constructor
@param server_address: tuple of server IP and port to listen on.
"""
self._server = ThreadedHitServer(server_address, ProxyHandler)
self._thread = threading.Thread(target=self._server.serve_forever)
def run(self):
"""Run the server on a thread"""
self._thread.start()
def stop(self):
"""Stop the server and its threads"""
self._server.shutdown()
self._server.socket.close()
self._thread.join()
def reset_hits(self):
"""Reset the number of matched requests to 0"""
return self._server.reset_hits()
def hits(self):
"""Get the number of matched requests
@return the count of matched requests
"""
return self._server.hits()
class SignalListener(object):
"""A class to listen for a DBus signal
"""
DEFAULT_TIMEOUT = 60
_main_loop = None
_signals = { }
def __init__(self, g_main_loop):
"""Constructor
@param g_mail_loop: glib main loop object.
"""
self._main_loop = g_main_loop
def listen_for_signal(self, signal, interface, path):
"""Listen with a default handler
@param signal: signal name to listen for
@param interface: DBus interface to expect it from
@param path: DBus path associated with the signal
"""
self.__listen_to_signal(self.__handle_signal, signal, interface, path)
def wait_for_signals(self, desc,
timeout=DEFAULT_TIMEOUT):
"""Block for |timeout| seconds waiting for the signals to come in.
@param desc: string describing the high-level reason you're waiting
for the signals.
@param timeout: maximum seconds to wait for the signals.
@raises TimeoutError if the timeout is hit.
"""
utils.poll_for_condition(
condition=lambda: self.__received_signals(),
desc=desc,
timeout=self.DEFAULT_TIMEOUT)
all_signals = self._signals.copy()
self.__reset_signal_state()
return all_signals
def __received_signals(self):
"""Run main loop until all pending events are done, checks for signals.
Runs self._main_loop until it says it has no more events pending,
then returns the state of the internal variables tracking whether
desired signals have been received.
@return True if both signals have been handled, False otherwise.
"""
context = self._main_loop.get_context()
while context.iteration(False):
pass
return len(self._signals) > 0
def __reset_signal_state(self):
"""Resets internal signal tracking state."""
self._signals = { }
def __listen_to_signal(self, callback, signal, interface, path):
"""Connect a callback to a given session_manager dbus signal.
Sets up a signal receiver for signal, and calls the provided callback
when it comes in.
@param callback: a callable to call when signal is received.
@param signal: the signal to listen for.
"""
bus = dbus.SystemBus(mainloop=self._main_loop)
bus.add_signal_receiver(
handler_function=callback,
signal_name=signal,
dbus_interface=interface,
bus_name=None,
path=path,
member_keyword='signal_name')
def __handle_signal(self, *args, **kwargs):
"""Callback to be used when a new key signal is received."""
signal_name = kwargs.pop('signal_name', '')
#signal_data = str(args[0])
logging.info("SIGNAL: " + signal_name + ", " + str(args));
if self._signals.has_key(signal_name):
self._signals[signal_name].append(args)
else:
self._signals[signal_name] = [args]
class network_ProxyResolver(test.test):
"""A test fixture for validating the integration of
shill, Chrome, and tlsdated's proxy resolution.
"""
version = 1
auto_login = False
service_settings = { }
TIMEOUT = 360
def initialize(self):
"""Constructor
Sets up the test such that all DBus signals can be
received and a fake proxy server can be instantiated.
Additionally, the UI is restarted to ensure consistent
shared network use.
"""
super(network_ProxyResolver, self).initialize()
cros_ui.stop()
cros_ui.start()
DBusGMainLoop(set_as_default=True)
self._listener = SignalListener(gobject.MainLoop())
self._shill = shill_proxy.ShillProxy.get_proxy()
if self._shill is None:
raise error.TestFail('Could not connect to shill')
# Listen for network property changes
self._listener.listen_for_signal('PropertyChanged',
'org.chromium.flimflam.Service',
'/')
# Listen on the proxy port.
self._proxy_server = ProxyListener(('', 3128))
# Set the proxy with Shill. This only works for shared connections
# (like Eth).
def set_proxy(self, service_name, proxy_config):
"""Changes the ProxyConfig property on the specified shill service.
@param service_name: the name, as a str, of the shill service
@param proxy_config: the ProxyConfig property value string
@raises TestFail if the service is not found.
"""
shill = self._shill
service = shill.find_object('Service', { 'Name' : service_name })
if not service:
raise error.TestFail('Service ' + service_name +
' not found to test proxy with.')
props = service.GetProperties()
old_proxy = ''
if props.has_key('ProxyConfig'):
old_proxy = props['ProxyConfig']
if self.service_settings.has_key(service_name) == False:
logging.info('Preexisting ProxyConfig: ' + service_name +
' -> ' + old_proxy)
self.service_settings[service_name] = old_proxy
logging.info('Setting proxy to ' + proxy_config)
service.SetProperties({'ProxyConfig': proxy_config})
def reset_services(self):
"""Walks the dict of service->ProxyConfig values and sets the
proxy back to the originally observed value.
"""
if len(self.service_settings) == 0:
return
for k,v in self.service_settings.items():
logging.info('Resetting ProxyConfig: ' + k + ' -> ' + v)
self.set_proxy(k, v)
def check_chrome(self, proxy_type, proxy_config, timeout):
"""Check that Chrome has acknowledged the supplied proxy config
by asking for resolution over DBus.
@param proxy_type: PAC-style string type (e.g., 'PROXY', 'SOCKS')
@param proxy_config: PAC-style config string (e.g., 127.0.0.1:1234)
@param timeout: time in seconds to wait for Chrome to issue a signal.
@return True if a matching response is seen and False otherwise
"""
bus = dbus.SystemBus()
dbus_proxy = bus.get_object('org.chromium.NetworkProxyService',
'/org/chromium/NetworkProxyService')
service = dbus.Interface(dbus_proxy,
'org.chromium.NetworkProxyServiceInterface')
attempts = timeout
while attempts > 0:
result, _ = service.ResolveProxy('https://clients3.google.com')
if str(result) == proxy_type + ' ' + proxy_config:
return True
attempts -= 1
time.sleep(1)
logging.error('Last response seen before giving up: ' + str(result))
return False
def check_tlsdated(self, timeout):
"""Check that tlsdated uses the set proxy.
@param timeout: time in seconds to wait for tlsdate to restart and query
@return True if tlsdated hits the proxy server and False otherwise
"""
# Restart tlsdated to force a network resync
# (The other option is to force it to think there is no network sync.)
try:
self._proxy_server.run()
except Exception as e:
logging.error("Proxy error =>" + str(e))
return False
logging.info("proxy started!")
status = subprocess.call(['initctl', 'restart', 'tlsdated'])
if status != 0:
logging.info("failed to restart tlsdated")
return False
attempts = timeout
logging.info("waiting for hits on the proxy server")
while attempts > 0:
if self._proxy_server.hits() > 0:
self._proxy_server.reset_hits()
return True
time.sleep(1)
attempts -= 1
logging.info("no hits")
return False
def cleanup(self):
"""Reset all the service data and teardown the proxy."""
self.reset_services()
logging.info("tearing down the proxy server")
self._proxy_server.stop()
logging.info("proxy server down")
super(network_ProxyResolver, self).cleanup()
def test_same_ip_proxy_at_signin_chrome_system_tlsdated(
self,
service_name,
test_timeout=TIMEOUT):
""" Set the user policy, waits for condition, then logs out.
@param service_name: shill service name to test on
@param test_timeout: the total time in seconds split among all timeouts.
"""
proxy_type = 'http'
proxy_port = '3128'
proxy_host = '127.0.0.1'
proxy_url = proxy_type + '://' + proxy_host + ':' + proxy_port
# TODO(wad) Only do the below if it was a single protocol proxy.
# proxy_config = proxy_type + '=' + proxy_host + ':' + proxy_port
proxy_config = proxy_host + ':' + proxy_port
self.set_proxy(service_name, '{"mode":"fixed_servers","server":"' +
proxy_config + '"}')
logging.info("checking chrome")
if self.check_chrome('PROXY', proxy_config, test_timeout/3) == False:
raise error.TestFail('Chrome failed to resolve the proxy')
# Restart tlsdate to force a network fix
logging.info("checking tlsdated")
if self.check_tlsdated(test_timeout/3) == False:
raise error.TestFail('tlsdated never tried the proxy')
logging.info("done!")
def run_once(self, test_type, **params):
logging.info('client: Running client test %s', test_type)
getattr(self, test_type)(**params)