blob: 617e48d299b965d2dbafb2b52f1f13ec1a3cb9a8 [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus, dbus.mainloop.glib, dbus.service, gobject, logging, re
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ui, cros_ui_test
_QUESTION_START = '''
<h5>
The Bluetooth scan discovered the following devices.<br>
If a device is not on the list, switch it into pairing mode and rescan.<br>
<br>
You can click on an input device (e.g., mouse) to pair with it.<br>
</h5>
<table border="1"><tr><td>Address</td><td>Name</td></tr>
'''
_HREF_START = '''<a href="#" onclick="do_submit('%s')">'''
_HREF_END = '''</a>'''
class Agent(dbus.service.Object):
@dbus.service.method("org.bluez.Agent",
in_signature="", out_signature="")
def Release(self):
logging.debug("Agent: Release")
@dbus.service.method("org.bluez.Agent",
in_signature="o", out_signature="s")
def RequestPinCode(self, device):
pin = '0000'
logging.debug('Agent: RequestPinCode (%s), sending %s.', device, pin)
return pin
@dbus.service.method("org.bluez.Agent",
in_signature="", out_signature="")
def Cancel(self):
logging.debug('Agent: Cancel')
class hardware_BluetoothSemiAuto(cros_ui_test.UITest):
version = 1
def initialize(self, creds = '$default'):
cros_ui_test.UITest.initialize(self, creds)
def cleanup(self):
cros_ui_test.UITest.cleanup(self)
self.disconnect_all()
def handle_reply(self, device):
logging.debug("Device created: %s", device)
self.mainloop.quit()
def handle_error(self, error):
logging.debug('Unable to create device: %s', error)
self.mainloop.quit()
def get_bus_adapter(self):
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"),
"org.bluez.Manager")
adapter = dbus.Interface(bus.get_object("org.bluez",
manager.DefaultAdapter()),
"org.bluez.Adapter")
return (bus, adapter)
def do_connect(self, addr):
logging.debug("do_connect: %s", addr)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
pin_required = addr.endswith('p')
if pin_required:
addr = addr[:-1]
bus, adapter = self.get_bus_adapter()
logging.debug("Creating Agent")
agent_path = "/blueztestagent"
try:
agent = Agent(bus, agent_path)
except Exception, e:
logging.debug('Unable to create an agent: %s', e)
self.mainloop = gobject.MainLoop()
try:
device = adapter.FindDevice(addr)
adapter.RemoveDevice(device)
except Exception, e:
logging.debug('Unable to find/remove device %s: %s', addr, e)
adapter.CreatePairedDevice(addr, agent_path, "DisplayOnly",
reply_handler=self.handle_reply,
error_handler=self.handle_error)
logging.debug('Starting mainloop...')
if pin_required:
# The user will have to enter the pin code on the keyboard. The
# code must be entered after the discovery process starts. The
# problem is that the Agent class defined above does not provide
# enough flexibility to allow the caller to do something after
# pairing started but before it has been completed or timed out.
# This point in time is the closest to the pairing process start.
# So we ask the user to enter the pin code 5 seconds after this
# page closes: pairing starts right after that and 5 seconds is
# enough for the process to be ready to accept user input.
question = 'Enter pin code "0000" on the BT keyboard '
question += 'at least 5 secs after this page closes'
dialog = cros_ui.Dialog(question=question, choices=[],
checkboxes=[], textinputs=[], timeout=3)
dialog.get_entries() # Prompt user to enter pin after a timeout.
dialog.init(choices=[], timeout=1)
dialog.get_entries() # Clear the page to indicate timeout start.
self.mainloop.run()
logging.debug('... mainloop ended.')
device = adapter.FindDevice(addr)
input = dbus.Interface(bus.get_object("org.bluez", device),
"org.bluez.Input")
input.Connect()
logging.debug('Connected to input:%s.', addr)
def disconnect_all(self):
logging.debug('disconnect_all')
_, adapter = self.get_bus_adapter()
for dev in list(adapter.ListDevices()):
logging.debug('disconnecting %s' % dev)
adapter.RemoveDevice(dev)
def run_once(self):
question_prepend = ''
checkboxes = ['BT Mouse', 'Built In Mouse']
textinputs = ['BT Keyboard', 'Built In Keyboard']
while True:
question = question_prepend + _QUESTION_START
hciscan = utils.system_output('hcitool scan')
logging.debug(hciscan)
for line in hciscan.split('\n'):
line = line.strip()
match = re.search(r'^(..:..:..:..:..:..)\s+(.*)$', line)
if match:
addr = match.group(1)
if 'keyboard' in line.lower():
addr += 'p' # Pin's required
question += '<tr>'
question += ('<td>' +
(_HREF_START % addr) + addr + _HREF_END +
'</td>')
question += '<td>' + match.group(2) + '</td>'
question += '</tr>'
question += '</table><br>'
dialog = cros_ui.Dialog(question=question, choices=['Done', 'Rescan'],
checkboxes=checkboxes, textinputs=textinputs)
form_entries = dialog.get_entries()
if not form_entries:
raise error.TestFail('Timeout')
result = form_entries['result']
if result == 'Rescan':
question_prepend = ''
continue
elif result == 'Done':
self.process_form_entries(form_entries, checkboxes, textinputs)
return
logging.debug("Connecting to %s", result)
try:
self.do_connect(result)
question_prepend = 'Paired with device %s.<br>' % result
except Exception, e:
logging.debug('Unable to connect: %s', e)
question_prepend = 'Unable to pair with device %s.<br>' % result
def process_form_entries(self, form_entries, checkboxes, textinputs):
bt_errors = []
for check in checkboxes:
if form_entries.get(check) != 'on':
bt_errors.append('"%s" not checked' % check)
for text in textinputs:
if not form_entries.get(text):
bt_errors.append('no input in "%s" field' % text)
if bt_errors:
raise error.TestFail('Bluetooth input errors:\n%s' %
'\n'.join(bt_errors))