# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.;
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# This is an integration test which ensures that a proxy set on a
# shared network connection is exposed via LibCrosSevice and used
# by tlsdated during time synchronization.

import dbus
import gobject
import logging
import subprocess
import threading
import time

from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ui
# This hacks the path so that we can import shill_proxy.
# pylint: disable=W0611
from autotest_lib.client.cros import flimflam_test_path
# pylint: enable=W0611
import shill_proxy

from dbus.mainloop.glib import DBusGMainLoop
from SocketServer import ThreadingTCPServer, StreamRequestHandler

class ProxyHandler(StreamRequestHandler):
    """Matching request handler for the ThreadedHitServer
       that notes when an expected request is seen.
    """
    wbufsize = -1
    def handle(self):
        """Reads the first line, up to 40 characters, looking
           for the CONNECT string that tlsdated sends. If it
           is found, the server's hit() method is called.

           All requests receive a HTTP 504 error.
        """
        # Read up to 40 characters
        data = self.rfile.readline(40).strip()
        logging.info('ProxyHandler::handle(): <%s>', data)
        # TODO(wad) Add User-agent check when it lands in tlsdate.
        # Also, abstract the time server and move this code into cros/.
        if data.__contains__('CONNECT clients3.google.com:443 HTTP/1.1'):
          self.server.hit()
        self.wfile.write("HTTP/1.1 504 Gateway Timeout\r\n" +
                         "Connection: close\r\n\r\n")

class ThreadedHitServer(ThreadingTCPServer):
    """A threaded TCP server which services requests
       and allows the handler to track "hits".
    """
    def __init__(self, server_address, HandlerClass):
        """Constructor

        @param server_address: tuple of server IP and port to listen on.
        @param HandlerClass: the RequestHandler class to instantiate per req.
        """
        self._hits = 0
        ThreadingTCPServer.__init__(self, server_address, HandlerClass)

    def hit(self):
        """Increment the hit count. Usually called by the HandlerClass"""
        self._hits += 1

    def reset_hits(self):
        """Set the hit count to 0"""
        self._hits = 0

    def hits(self):
        """Get the number of matched requests
        @return the count of matched requests
        """
        return self._hits

class ProxyListener(object):
    """A fake listener for tracking if an expected CONNECT request is
       seen at the provided server address. Any hits are exposed to be
       consumed by the caller.
    """
    def __init__(self, server_address):
        """Constructor

        @param server_address: tuple of server IP and port to listen on.
        """
        self._server = ThreadedHitServer(server_address, ProxyHandler)
        self._thread = threading.Thread(target=self._server.serve_forever)

    def run(self):
        """Run the server on a thread"""
        self._thread.start()

    def stop(self):
        """Stop the server and its threads"""
        self._server.shutdown()
        self._server.socket.close()
        self._thread.join()

    def reset_hits(self):
        """Reset the number of matched requests to 0"""
        return self._server.reset_hits()

    def hits(self):
        """Get the number of matched requests
        @return the count of matched requests
        """
        return self._server.hits()

class SignalListener(object):
    """A class to listen for a DBus signal
    """
    DEFAULT_TIMEOUT = 60
    _main_loop = None
    _signals = { }

    def __init__(self, g_main_loop):
        """Constructor

        @param g_mail_loop: glib main loop object.
        """
        self._main_loop = g_main_loop


    def listen_for_signal(self, signal, interface, path):
        """Listen with a default handler
        @param signal: signal name to listen for
        @param interface: DBus interface to expect it from
        @param path: DBus path associated with the signal
        """
        self.__listen_to_signal(self.__handle_signal, signal, interface, path)


    def wait_for_signals(self, desc,
                         timeout=DEFAULT_TIMEOUT):
        """Block for |timeout| seconds waiting for the signals to come in.

        @param desc: string describing the high-level reason you're waiting
                     for the signals.
        @param timeout: maximum seconds to wait for the signals.

        @raises TimeoutError if the timeout is hit.
        """
        utils.poll_for_condition(
            condition=lambda: self.__received_signals(),
            desc=desc,
            timeout=self.DEFAULT_TIMEOUT)
        all_signals = self._signals.copy()
        self.__reset_signal_state()
        return all_signals


    def __received_signals(self):
        """Run main loop until all pending events are done, checks for signals.

        Runs self._main_loop until it says it has no more events pending,
        then returns the state of the internal variables tracking whether
        desired signals have been received.

        @return True if both signals have been handled, False otherwise.
        """
        context = self._main_loop.get_context()
        while context.iteration(False):
            pass
        return len(self._signals) > 0


    def __reset_signal_state(self):
        """Resets internal signal tracking state."""
        self._signals = { }


    def __listen_to_signal(self, callback, signal, interface, path):
        """Connect a callback to a given session_manager dbus signal.

        Sets up a signal receiver for signal, and calls the provided callback
        when it comes in.

        @param callback: a callable to call when signal is received.
        @param signal: the signal to listen for.
        """
        bus = dbus.SystemBus(mainloop=self._main_loop)
        bus.add_signal_receiver(
            handler_function=callback,
            signal_name=signal,
            dbus_interface=interface,
            bus_name=None,
            path=path,
            member_keyword='signal_name')


    def __handle_signal(self, *args, **kwargs):
        """Callback to be used when a new key signal is received."""
        signal_name = kwargs.pop('signal_name', '')
        #signal_data = str(args[0])
        logging.info("SIGNAL: " + signal_name + ", " + str(args));
        if self._signals.has_key(signal_name):
          self._signals[signal_name].append(args)
        else:
          self._signals[signal_name] = [args]


class network_ProxyResolver(test.test):
    """A test fixture for validating the integration of
       shill, Chrome, and tlsdated's proxy resolution.
    """
    version = 1
    auto_login = False
    service_settings = { }

    TIMEOUT = 360

    def initialize(self):
       """Constructor
          Sets up the test such that all DBus signals can be
          received and a fake proxy server can be instantiated.
          Additionally, the UI is restarted to ensure consistent
          shared network use.
       """
       super(network_ProxyResolver, self).initialize()
       cros_ui.stop()
       cros_ui.start()
       DBusGMainLoop(set_as_default=True)
       self._listener = SignalListener(gobject.MainLoop())
       self._shill = shill_proxy.ShillProxy.get_proxy()
       if self._shill is None:
         raise error.TestFail('Could not connect to shill')
       # Listen for ProxyResolve responses
       self._listener.listen_for_signal('ProxyChange',
                                        'org.chromium.AutotestProxyInterface',
                                        '/org/chromium/LibCrosService')
       # Listen for network property changes
       self._listener.listen_for_signal('PropertyChanged',
                                        'org.chromium.flimflam.Service',
                                        '/')
       # Listen on the proxy port.
       self._proxy_server = ProxyListener(('', 3128))

    # Set the proxy with Shill. This only works for shared connections
    # (like Eth).
    def set_proxy(self, service_name, proxy_config):
        """Changes the ProxyConfig property on the specified shill service.

        @param service_name: the name, as a str, of the shill service
        @param proxy_config: the ProxyConfig property value string

        @raises TestFail if the service is not found.
        """
        shill = self._shill
        service = shill.find_object('Service', { 'Name' : service_name })
        if not service:
            raise error.TestFail('Service ' + service_name +
                                 ' not found to test proxy with.')
        props = service.GetProperties()
        old_proxy = ''
        if props.has_key('ProxyConfig'):
          old_proxy = props['ProxyConfig']
        if self.service_settings.has_key(service_name) == False:
          logging.info('Preexisting ProxyConfig: ' + service_name +
                       ' -> ' + old_proxy)
          self.service_settings[service_name] = old_proxy
        logging.info('Setting proxy to ' + proxy_config)
        service.SetProperties({'ProxyConfig': proxy_config})


    def reset_services(self):
        """Walks the dict of service->ProxyConfig values and sets the
           proxy back to the originally observed value.
        """
        if len(self.service_settings) == 0:
          return
        for k,v in self.service_settings.items():
          logging.info('Resetting ProxyConfig: ' + k + ' -> ' + v)
          self.set_proxy(k, v)


    def check_chrome(self, proxy_type, proxy_config, timeout):
        """Check that Chrome has acknowledged the supplied proxy config
           by asking for resolution over DBus.

        @param proxy_type: PAC-style string type (e.g., 'PROXY', 'SOCKS')
        @param proxy_config: PAC-style config string (e.g., 127.0.0.1:1234)
        @param timeout: time in seconds to wait for Chrome to issue a signal.

        @return True if a matching response is seen and False otherwise
        """
        bus = dbus.SystemBus()
        dbus_proxy = bus.get_object('org.chromium.LibCrosService',
                                    '/org/chromium/LibCrosService')
        cros_service = dbus.Interface(dbus_proxy,
                                      'org.chromium.LibCrosServiceInterface')
        attempts = timeout
        while attempts > 0:
          cros_service.ResolveNetworkProxy(
                                       'https://clients3.google.com',
                                       'org.chromium.AutotestProxyInterface',
                                       'ProxyChange')
          signals = self._listener.wait_for_signals(
                        'waiting for proxy resolution from Chrome')
          if signals['ProxyChange'][0][1] == proxy_type + ' ' + proxy_config:
            return True
          attempts -= 1
          time.sleep(1)
        logging.error('Last DBus signal seen before giving up: ' + str(signals))
        return False

    def check_tlsdated(self, timeout):
        """Check that tlsdated uses the set proxy.
        @param timeout: time in seconds to wait for tlsdate to restart and query
        @return True if tlsdated hits the proxy server and False otherwise
        """
        # Restart tlsdated to force a network resync
        # (The other option is to force it to think there is no network sync.)
        try:
            self._proxy_server.run()
        except Exception as e:
            logging.error("Proxy error =>" + str(e))
            return False
        logging.info("proxy started!")
        status = subprocess.call(['initctl', 'restart', 'tlsdated'])
        if status != 0:
          logging.info("failed to restart tlsdated")
          return False
        attempts = timeout
        logging.info("waiting for hits on the proxy server")
        while attempts > 0:
          if self._proxy_server.hits() > 0:
            self._proxy_server.reset_hits()
            return True
          time.sleep(1)
          attempts -= 1
        logging.info("no hits")
        return False


    def cleanup(self):
        """Reset all the service data and teardown the proxy."""
        self.reset_services()
        logging.info("tearing down the proxy server")
        self._proxy_server.stop()
        logging.info("proxy server down")
        super(network_ProxyResolver, self).cleanup()


    def test_same_ip_proxy_at_signin_chrome_system_tlsdated(
                                                        self,
                                                        service_name,
                                                        test_timeout=TIMEOUT):
        """ Set the user policy, waits for condition, then logs out.

        @param service_name: shill service name to test on
        @param test_timeout: the total time in seconds split among all timeouts.
        """
        proxy_type = 'http'
        proxy_port = '3128'
        proxy_host = '127.0.0.1'
        proxy_url = proxy_type + '://' + proxy_host + ':' + proxy_port
        # TODO(wad) Only do the below if it was a single protocol proxy.
        # proxy_config = proxy_type + '=' + proxy_host + ':' + proxy_port
        proxy_config = proxy_host + ':' + proxy_port
        self.set_proxy(service_name, '{"mode":"fixed_servers","server":"' +
                                     proxy_config + '"}')

        logging.info("checking chrome")
        if self.check_chrome('PROXY', proxy_config, test_timeout/3) == False:
          raise error.TestFail('Chrome failed to resolve the proxy')

        # Restart tlsdate to force a network fix
        logging.info("checking tlsdated")
        if self.check_tlsdated(test_timeout/3) == False:
          raise error.TestFail('tlsdated never tried the proxy')
        logging.info("done!")

    def run_once(self, test_type, **params):
        logging.info('client: Running client test %s', test_type)
        getattr(self, test_type)(**params)
