blob: 00d894cb17e7663f7a02cfa25b423c22fbff09ba [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
import logging, pprint, time, traceback, sys
import dbus, dbus.mainloop.glib, glib, gobject
from autotest_lib.client.cros import flimflam_test_path
import mm, flimflam
import os
def ExceptionForward(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception, e:
logging.warning('Saving exception: %s' % e)
logging.warning(''.join(traceback.format_exception(*sys.exc_info())))
self._forwarded_exception = e
self.main_loop.quit()
return False
return wrapper
class DisconnnectTesterMainLoop(object):
version = 1
def __init__(self, test, main_loop):
self._forwarded_exception = None
self.main_loop = main_loop
self.test = test
def assert_(self, arg):
self.test.assert_(self, arg)
@ExceptionForward
def timeout_main_loop(self):
logging.warning('Requirements unsatisfied upon timeout: %s' %
self.remaining_requirements)
self.main_loop.quit()
raise error.TestFail('Main loop timed out')
def requirement_completed(self, requirement, warn_if_already_completed=True):
"""Record that a requirement was completed. Exit if all are."""
should_log = True
try:
self.remaining_requirements.remove(requirement)
except KeyError:
if warn_if_already_completed:
logging.warning('requirement %s was not present to be completed',
requirement)
else:
should_log = False
if not self.remaining_requirements:
logging.info('All requirements satisfied')
self.main_loop.quit()
else:
if should_log:
logging.info('Requirement %s satisfied. Remaining: %s' %
(requirement, self.remaining_requirements))
def perform_one_test(self):
"""Subclasses override this function to do their testing."""
raise Exception('perform_one_test must be overridden')
@ExceptionForward
def generic_dbus_error_handler(self, e):
raise error.TestFail('Dbus call failed: %s' % e)
def run(self, **kwargs):
self.test_args = kwargs
gobject.timeout_add(int(self.test_args.get('timeout_s', 10) * 1000),
self.timeout_main_loop)
gobject.idle_add(self.perform_one_test)
self.main_loop.run()
if self._forwarded_exception:
raise self._forwarded_exception
self.after_main_loop()
class ModemDisableTester(DisconnnectTesterMainLoop):
def __init__(self, test, main_loop):
super(ModemDisableTester, self).__init__(test, main_loop)
self.remaining_requirements = set(['connect', 'disable', 'get_status'])
def modem_enabled(self):
return self.modem_manager.Properties(self.modem_path).get('Enabled', -1)
def configure_modem(self):
self.modem_manager, self.modem_path = mm.PickOneModem('')
self.modem = self.modem_manager.Modem(self.modem_path)
self.simple_modem = self.modem_manager.SimpleModem(self.modem_path)
self.gobi_modem = self.modem_manager.GobiModem(self.modem_path)
if self.gobi_modem:
sleep_ms = self.test_args.get('async_connect_sleep_ms', 0)
# Tell the modem manager to sleep this long before completing a
# connect
self.gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
self.modem.Enable(False)
self.modem.Enable(True)
@ExceptionForward
def perform_one_test(self):
self.configure_modem()
logging.info('connecting')
retval = self.simple_modem.Connect(
{},
reply_handler=self.connect_success_handler,
error_handler=self.connect_error_handler)
logging.info('connect call made. retval = %s', retval)
disable_delay_ms = (
self.test_args.get('delay_before_disable_ms', 0) +
self.test.iteration *
self.test_args.get('disable_delay_per_iteration_ms', 0))
gobject.timeout_add(disable_delay_ms, self.start_disable)
self.status_delay_ms = self.test_args.get('status_delay_ms', 200)
gobject.timeout_add(self.status_delay_ms, self.start_get_status)
@ExceptionForward
def connect_success_handler(self, *ignored_args):
logging.info('Reply done')
self.requirement_completed('connect')
@ExceptionForward
def connect_error_handler(self, *ignored_args):
logging.info('Reply errored.')
self.requirement_completed('connect')
@ExceptionForward
def start_disable(self):
logging.info('disabling')
self.disable_start = time.time()
self.modem.Enable(False,
reply_handler=self.disable_success_handler,
error_handler=self.generic_dbus_error_handler)
@ExceptionForward
def disable_success_handler(self):
disable_elapsed = time.time() - self.disable_start
self.assert_(disable_elapsed <
1.0 + self.test_args.get('async_connect_sleep_ms', 0))
self.requirement_completed('disable')
@ExceptionForward
def start_get_status(self):
# Keep on calling get_status to make sure it works at all times
self.simple_modem.GetStatus(reply_handler=self.get_status_success_handler,
error_handler=self.generic_dbus_error_handler)
@ExceptionForward
def get_status_success_handler(self, status):
logging.info('Got status')
self.requirement_completed('get_status', warn_if_already_completed=False)
gobject.timeout_add(self.status_delay_ms, self.start_get_status)
def after_main_loop(self):
enabled = self.modem_enabled()
logging.info('Modem enabled: %s', enabled)
self.assert_(enabled == 0)
class network_3GDisableWhileConnecting(test.test):
version = 1
def run_once(self, **kwargs):
logging.info('setting main loop')
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.main_loop = gobject.MainLoop()
modem = ModemDisableTester(self, self.main_loop)
modem.run(**kwargs)