| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import dbus.mainloop.glib |
| import dbus.service |
| import gobject |
| import json |
| import logging |
| import logging.handlers |
| import os |
| import shutil |
| |
| import common |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket |
| from autotest_lib.client.cros import constants |
| from autotest_lib.client.cros import xmlrpc_server |
| |
| |
| class _PinAgent(dbus.service.Object): |
| """The agent handling bluetooth device with a known pin code. |
| |
| _PinAgent overrides RequestPinCode method to return a given pin code. |
| User can use this agent to pair bluetooth device which has a known pin code. |
| |
| """ |
| def __init__(self, pin, *args, **kwargs): |
| super(_PinAgent, self).__init__(*args, **kwargs) |
| self._pin = pin |
| |
| |
| @dbus.service.method('org.bluez.Agent1', in_signature="o", out_signature="s") |
| def RequestPinCode(self, device_path): |
| """Requests pin code for a device. |
| |
| Returns the known pin code for the request. |
| |
| @param device_path: The object path of the device. |
| |
| @returns: The known pin code. |
| |
| """ |
| logging.info('RequestPinCode for %s, return %s', device_path, self._pin) |
| return self._pin |
| |
| |
| class BluetoothDeviceXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): |
| """Exposes DUT methods called remotely during Bluetooth autotests. |
| |
| All instance methods of this object without a preceding '_' are exposed via |
| an XML-RPC server. This is not a stateless handler object, which means that |
| if you store state inside the delegate, that state will remain around for |
| future calls. |
| """ |
| |
| UPSTART_PATH = 'unix:abstract=/com/ubuntu/upstart' |
| UPSTART_MANAGER_PATH = '/com/ubuntu/Upstart' |
| UPSTART_MANAGER_IFACE = 'com.ubuntu.Upstart0_6' |
| UPSTART_JOB_IFACE = 'com.ubuntu.Upstart0_6.Job' |
| |
| UPSTART_ERROR_UNKNOWNINSTANCE = \ |
| 'com.ubuntu.Upstart0_6.Error.UnknownInstance' |
| |
| BLUETOOTHD_JOB = 'bluetoothd' |
| |
| DBUS_ERROR_SERVICEUNKNOWN = 'org.freedesktop.DBus.Error.ServiceUnknown' |
| |
| BLUEZ_SERVICE_NAME = 'org.bluez' |
| BLUEZ_MANAGER_PATH = '/' |
| BLUEZ_MANAGER_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| BLUEZ_ADAPTER_IFACE = 'org.bluez.Adapter1' |
| BLUEZ_DEVICE_IFACE = 'org.bluez.Device1' |
| BLUEZ_AGENT_MANAGER_PATH = '/org/bluez' |
| BLUEZ_AGENT_MANAGER_IFACE = 'org.bluez.AgentManager1' |
| BLUEZ_PROFILE_MANAGER_PATH = '/org/bluez' |
| BLUEZ_PROFILE_MANAGER_IFACE = 'org.bluez.ProfileManager1' |
| BLUEZ_ERROR_ALREADY_EXISTS = 'org.bluez.Error.AlreadyExists' |
| |
| BLUETOOTH_LIBDIR = '/var/lib/bluetooth' |
| |
| # Timeout for how long we'll wait for BlueZ and the Adapter to show up |
| # after reset. |
| ADAPTER_TIMEOUT = 30 |
| |
| def __init__(self): |
| super(BluetoothDeviceXmlRpcDelegate, self).__init__() |
| |
| # Open the Bluetooth Raw socket to the kernel which provides us direct, |
| # raw, access to the HCI controller. |
| self._raw = bluetooth_socket.BluetoothRawSocket() |
| |
| # Open the Bluetooth Control socket to the kernel which provides us |
| # raw management access to the Bluetooth Host Subsystem. Read the list |
| # of adapter indexes to determine whether or not this device has a |
| # Bluetooth Adapter or not. |
| self._control = bluetooth_socket.BluetoothControlSocket() |
| self._has_adapter = len(self._control.read_index_list()) > 0 |
| |
| # Set up the connection to Upstart so we can start and stop services |
| # and fetch the bluetoothd job. |
| self._upstart_conn = dbus.connection.Connection(self.UPSTART_PATH) |
| self._upstart = self._upstart_conn.get_object( |
| None, |
| self.UPSTART_MANAGER_PATH) |
| |
| bluetoothd_path = self._upstart.GetJobByName( |
| self.BLUETOOTHD_JOB, |
| dbus_interface=self.UPSTART_MANAGER_IFACE) |
| self._bluetoothd = self._upstart_conn.get_object( |
| None, |
| bluetoothd_path) |
| |
| # Arrange for the GLib main loop to be the default. |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| # Set up the connection to the D-Bus System Bus, get the object for |
| # the Bluetooth Userspace Daemon (BlueZ) and that daemon's object for |
| # the Bluetooth Adapter. |
| self._system_bus = dbus.SystemBus() |
| self._update_bluez() |
| self._update_adapter() |
| |
| # The agent to handle pin code request, which will be |
| # created when user calls pair_legacy_device method. |
| self._pin_agent = None |
| |
| |
| def _update_bluez(self): |
| """Store a D-Bus proxy for the Bluetooth daemon in self._bluez. |
| |
| This may be called in a loop until it returns True to wait for the |
| daemon to be ready after it has been started. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| self._bluez = None |
| try: |
| self._bluez = self._system_bus.get_object( |
| self.BLUEZ_SERVICE_NAME, |
| self.BLUEZ_MANAGER_PATH) |
| logging.debug('bluetoothd is running') |
| return True |
| except dbus.exceptions.DBusException, e: |
| if e.get_dbus_name() == self.DBUS_ERROR_SERVICEUNKNOWN: |
| logging.debug('bluetoothd is not running') |
| self._bluez = None |
| return False |
| else: |
| logging.error('Error updating Bluez!') |
| raise |
| |
| |
| def _update_adapter(self): |
| """Store a D-Bus proxy for the local adapter in self._adapter. |
| |
| This may be called in a loop until it returns True to wait for the |
| daemon to be ready, and have obtained the adapter information itself, |
| after it has been started. |
| |
| Since not all devices will have adapters, this will also return True |
| in the case where we have obtained an empty adapter index list from the |
| kernel. |
| |
| @return True on success, including if there is no local adapter, |
| False otherwise. |
| |
| """ |
| self._adapter = None |
| if self._bluez is None: |
| logging.warning('Bluez not found!') |
| return False |
| if not self._has_adapter: |
| logging.debug('Device has no adapter; returning') |
| return True |
| |
| objects = self._bluez.GetManagedObjects( |
| dbus_interface=self.BLUEZ_MANAGER_IFACE) |
| for path, ifaces in objects.iteritems(): |
| logging.debug('%s -> %r', path, ifaces.keys()) |
| if self.BLUEZ_ADAPTER_IFACE in ifaces: |
| logging.debug('using adapter %s', path) |
| self._adapter = self._system_bus.get_object( |
| self.BLUEZ_SERVICE_NAME, |
| path) |
| return True |
| else: |
| logging.warning('No adapter found in interface!') |
| return False |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def reset_on(self): |
| """Reset the adapter and settings and power up the adapter. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| self._reset() |
| if not self._adapter: |
| return False |
| self._set_powered(True) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def reset_off(self): |
| """Reset the adapter and settings, leave the adapter powered off. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| self._reset() |
| return True |
| |
| |
| def has_adapter(self): |
| """Return if an adapter is present. |
| |
| This will only return True if we have determined both that there is |
| a Bluetooth adapter on this device (kernel adapter index list is not |
| empty) and that the Bluetooth daemon has exported an object for it. |
| |
| @return True if an adapter is present, False if not. |
| |
| """ |
| return self._has_adapter and self._adapter is not None |
| |
| |
| def _reset(self): |
| """Reset the Bluetooth adapter and settings.""" |
| logging.debug('_reset') |
| if self._adapter: |
| self._set_powered(False) |
| |
| try: |
| self._bluetoothd.Stop(dbus.Array(signature='s'), True, |
| dbus_interface=self.UPSTART_JOB_IFACE) |
| except dbus.exceptions.DBusException, e: |
| if e.get_dbus_name() != self.UPSTART_ERROR_UNKNOWNINSTANCE: |
| logging.error('Error resetting adapter!') |
| raise |
| |
| def bluez_stopped(): |
| """Checks the bluetooth daemon status. |
| |
| @returns: True if bluez is stopped. False otherwise. |
| |
| """ |
| return not self._update_bluez() |
| |
| logging.debug('waiting for bluez stop') |
| utils.poll_for_condition( |
| condition=bluez_stopped, |
| desc='Bluetooth Daemon has stopped.', |
| timeout=self.ADAPTER_TIMEOUT) |
| |
| for subdir in os.listdir(self.BLUETOOTH_LIBDIR): |
| shutil.rmtree(os.path.join(self.BLUETOOTH_LIBDIR, subdir)) |
| |
| self._bluetoothd.Start(dbus.Array(signature='s'), True, |
| dbus_interface=self.UPSTART_JOB_IFACE) |
| |
| logging.debug('waiting for bluez start') |
| utils.poll_for_condition( |
| condition=self._update_bluez, |
| desc='Bluetooth Daemon has started.', |
| timeout=self.ADAPTER_TIMEOUT) |
| |
| logging.debug('waiting for bluez to obtain adapter information') |
| utils.poll_for_condition( |
| condition=self._update_adapter, |
| desc='Bluetooth Daemon has adapter information.', |
| timeout=self.ADAPTER_TIMEOUT) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_powered(self, powered): |
| """Set the adapter power state. |
| |
| @param powered: adapter power state to set (True or False). |
| |
| @return True on success, False otherwise. |
| |
| """ |
| if not self._adapter: |
| if not powered: |
| # Return success if we are trying to power off an adapter that's |
| # missing or gone away, since the expected result has happened. |
| return True |
| else: |
| logging.warning('Adapter not found!') |
| return False |
| self._set_powered(powered) |
| return True |
| |
| |
| def _set_powered(self, powered): |
| """Set the adapter power state. |
| |
| @param powered: adapter power state to set (True or False). |
| |
| """ |
| logging.debug('_set_powered %r', powered) |
| self._adapter.Set(self.BLUEZ_ADAPTER_IFACE, 'Powered', powered, |
| dbus_interface=dbus.PROPERTIES_IFACE) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_discoverable(self, discoverable): |
| """Set the adapter discoverable state. |
| |
| @param discoverable: adapter discoverable state to set (True or False). |
| |
| @return True on success, False otherwise. |
| |
| """ |
| if not discoverable and not self._adapter: |
| # Return success if we are trying to make an adapter that's |
| # missing or gone away, undiscoverable, since the expected result |
| # has happened. |
| return True |
| self._adapter.Set(self.BLUEZ_ADAPTER_IFACE, |
| 'Discoverable', discoverable, |
| dbus_interface=dbus.PROPERTIES_IFACE) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def set_pairable(self, pairable): |
| """Set the adapter pairable state. |
| |
| @param pairable: adapter pairable state to set (True or False). |
| |
| @return True on success, False otherwise. |
| |
| """ |
| self._adapter.Set(self.BLUEZ_ADAPTER_IFACE, 'Pairable', pairable, |
| dbus_interface=dbus.PROPERTIES_IFACE) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def get_adapter_properties(self): |
| """Read the adapter properties from the Bluetooth Daemon. |
| |
| @return the properties as a JSON-encoded dictionary on success, |
| the value False otherwise. |
| |
| """ |
| objects = self._bluez.GetManagedObjects( |
| dbus_interface=self.BLUEZ_MANAGER_IFACE) |
| adapter = objects[self._adapter.object_path][self.BLUEZ_ADAPTER_IFACE] |
| return json.dumps(adapter) |
| |
| |
| def read_version(self): |
| """Read the version of the management interface from the Kernel. |
| |
| @return the information as a JSON-encoded tuple of: |
| ( version, revision ) |
| |
| """ |
| return json.dumps(self._control.read_version()) |
| |
| |
| def read_supported_commands(self): |
| """Read the set of supported commands from the Kernel. |
| |
| @return the information as a JSON-encoded tuple of: |
| ( commands, events ) |
| |
| """ |
| return json.dumps(self._control.read_supported_commands()) |
| |
| |
| def read_index_list(self): |
| """Read the list of currently known controllers from the Kernel. |
| |
| @return the information as a JSON-encoded array of controller indexes. |
| |
| """ |
| return json.dumps(self._control.read_index_list()) |
| |
| |
| def read_info(self): |
| """Read the adapter information from the Kernel. |
| |
| @return the information as a JSON-encoded tuple of: |
| ( address, bluetooth_version, manufacturer_id, |
| supported_settings, current_settings, class_of_device, |
| name, short_name ) |
| |
| """ |
| return json.dumps(self._control.read_info(0)) |
| |
| |
| def add_device(self, address, address_type, action): |
| """Add a device to the Kernel action list. |
| |
| @param address: Address of the device to add. |
| @param address_type: Type of device in @address. |
| @param action: Action to take. |
| |
| @return on success, a JSON-encoded typle of: |
| ( address, address_type ), None on failure. |
| |
| """ |
| return json.dumps(self._control.add_device( |
| 0, address, address_type, action)) |
| |
| |
| def remove_device(self, address, address_type): |
| """Remove a device from the Kernel action list. |
| |
| @param address: Address of the device to remove. |
| @param address_type: Type of device in @address. |
| |
| @return on success, a JSON-encoded typle of: |
| ( address, address_type ), None on failure. |
| |
| """ |
| return json.dumps(self._control.remove_device( |
| 0, address, address_type)) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def get_devices(self): |
| """Read information about remote devices known to the adapter. |
| |
| @return the properties of each device as a JSON-encoded array of |
| dictionaries on success, the value False otherwise. |
| |
| """ |
| objects = self._bluez.GetManagedObjects( |
| dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=True) |
| devices = [] |
| for path, ifaces in objects.iteritems(): |
| if self.BLUEZ_DEVICE_IFACE in ifaces: |
| devices.append(objects[path][self.BLUEZ_DEVICE_IFACE]) |
| return json.dumps(devices) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def start_discovery(self): |
| """Start discovery of remote devices. |
| |
| Obtain the discovered device information using get_devices(), called |
| stop_discovery() when done. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| if not self._adapter: |
| return False |
| self._adapter.StartDiscovery(dbus_interface=self.BLUEZ_ADAPTER_IFACE) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def stop_discovery(self): |
| """Stop discovery of remote devices. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| if not self._adapter: |
| return False |
| self._adapter.StopDiscovery(dbus_interface=self.BLUEZ_ADAPTER_IFACE) |
| return True |
| |
| |
| def get_dev_info(self): |
| """Read raw HCI device information. |
| |
| @return JSON-encoded tuple of: |
| (index, name, address, flags, device_type, bus_type, |
| features, pkt_type, link_policy, link_mode, |
| acl_mtu, acl_pkts, sco_mtu, sco_pkts, |
| err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx, |
| sco_tx, sco_rx, byte_rx, byte_tx) on success, |
| None on failure. |
| |
| """ |
| return json.dumps(self._raw.get_dev_info(0)) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def register_profile(self, path, uuid, options): |
| """Register new profile (service). |
| |
| @param path: Path to the profile object. |
| @param uuid: Service Class ID of the service as string. |
| @param options: Dictionary of options for the new service, compliant |
| with BlueZ D-Bus Profile API standard. |
| |
| @return True on success, False otherwise. |
| |
| """ |
| profile_manager = dbus.Interface( |
| self._system_bus.get_object( |
| self.BLUEZ_SERVICE_NAME, |
| self.BLUEZ_PROFILE_MANAGER_PATH), |
| self.BLUEZ_PROFILE_MANAGER_IFACE) |
| profile_manager.RegisterProfile(path, uuid, options) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def has_device(self, address): |
| """Checks if the device with a given address exists. |
| |
| @param address: Address of the device. |
| |
| @returns: True if there is a device with that address. |
| False otherwise. |
| |
| """ |
| return self._find_device(address) != None |
| |
| |
| def _find_device(self, address): |
| """Finds the device with a given address. |
| |
| Find the device with a given address and returns the |
| device interface. |
| |
| @param address: Address of the device. |
| |
| @returns: An 'org.bluez.Device1' interface to the device. |
| None if device can not be found. |
| |
| """ |
| objects = self._bluez.GetManagedObjects( |
| dbus_interface=self.BLUEZ_MANAGER_IFACE) |
| for path, ifaces in objects.iteritems(): |
| device = ifaces.get(self.BLUEZ_DEVICE_IFACE) |
| if device is None: |
| continue |
| if (device['Address'] == address and |
| path.startswith(self._adapter.object_path)): |
| obj = self._system_bus.get_object( |
| self.BLUEZ_SERVICE_NAME, path) |
| return dbus.Interface(obj, self.BLUEZ_DEVICE_IFACE) |
| logging.error('Device not found') |
| return None |
| |
| |
| def _setup_pin_agent(self, pin): |
| """Initializes a _PinAgent and registers it to handle pin code request. |
| |
| @param pin: The pin code this agent will answer. |
| |
| """ |
| agent_path = '/test/agent' |
| if self._pin_agent: |
| logging.info('Removing the old agent before initializing a new one') |
| self._pin_agent.remove_from_connection() |
| self._pin_agent = None |
| self._pin_agent = _PinAgent(pin, self._system_bus, agent_path) |
| agent_manager = dbus.Interface( |
| self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, |
| self.BLUEZ_AGENT_MANAGER_PATH), |
| self.BLUEZ_AGENT_MANAGER_IFACE) |
| try: |
| agent_manager.RegisterAgent(agent_path, 'NoInputNoOutput') |
| except dbus.exceptions.DBusException, e: |
| if e.get_dbus_name() == self.BLUEZ_ERROR_ALREADY_EXISTS: |
| logging.info('Unregistering old agent and registering the new') |
| agent_manager.UnregisterAgent(agent_path) |
| agent_manager.RegisterAgent(agent_path, 'NoInputNoOutput') |
| else: |
| logging.error('Error setting up pin agent: %s', e) |
| raise |
| logging.info('Agent registered') |
| |
| |
| def _is_paired(self, device): |
| """Checks if a device is paired. |
| |
| @param device: An 'org.bluez.Device1' interface to the device. |
| |
| @returns: True if device is paired. False otherwise. |
| |
| """ |
| props = dbus.Interface(device, dbus.PROPERTIES_IFACE) |
| paired = props.Get(self.BLUEZ_DEVICE_IFACE, 'Paired') |
| return bool(paired) |
| |
| |
| def _is_connected(self, device): |
| """Checks if a device is connected. |
| |
| @param device: An 'org.bluez.Device1' interface to the device. |
| |
| @returns: True if device is connected. False otherwise. |
| |
| """ |
| props = dbus.Interface(device, dbus.PROPERTIES_IFACE) |
| connected = props.Get(self.BLUEZ_DEVICE_IFACE, 'Connected') |
| logging.info('Got connected = %r', connected) |
| return bool(connected) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def pair_legacy_device(self, address, pin, timeout): |
| """Pairs a device with a given pin code. |
| |
| Registers a agent who handles pin code request and |
| pairs a device with known pin code. |
| |
| @param address: Address of the device to pair. |
| @param pin: The pin code of the device to pair. |
| @param timeout: The timeout in seconds for pairing. |
| |
| @returns: True on success. False otherwise. |
| |
| """ |
| device = self._find_device(address) |
| if not device: |
| logging.error('Device not found') |
| return False |
| if self._is_paired(device): |
| logging.info('Device is already paired') |
| return True |
| |
| self._setup_pin_agent(pin) |
| mainloop = gobject.MainLoop() |
| |
| |
| def pair_reply(): |
| """Handler when pairing succeeded.""" |
| logging.info('Device paired') |
| mainloop.quit() |
| |
| |
| def pair_error(error): |
| """Handler when pairing failed. |
| |
| @param error: one of errors defined in org.bluez.Error representing |
| the error in pairing. |
| |
| """ |
| try: |
| error_name = error.get_dbus_name() |
| if error_name == 'org.freedesktop.DBus.Error.NoReply': |
| logging.error('Timed out. Cancelling pairing') |
| device.CancelPairing() |
| else: |
| logging.error('Pairing device failed: %s', error) |
| finally: |
| mainloop.quit() |
| |
| |
| device.Pair(reply_handler=pair_reply, error_handler=pair_error, |
| timeout=timeout * 1000) |
| mainloop.run() |
| return self._is_paired(device) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def remove_device_object(self, address): |
| """Removes a device object and the pairing information. |
| |
| Calls RemoveDevice method to remove remote device |
| object and the pairing information. |
| |
| @param address: Address of the device to unpair. |
| |
| @returns: True on success. False otherwise. |
| |
| """ |
| device = self._find_device(address) |
| if not device: |
| logging.error('Device not found') |
| return False |
| self._adapter.RemoveDevice( |
| device.object_path, dbus_interface=self.BLUEZ_ADAPTER_IFACE) |
| return True |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def connect_device(self, address): |
| """Connects a device. |
| |
| Connects a device if it is not connected. |
| |
| @param address: Address of the device to connect. |
| |
| @returns: True on success. False otherwise. |
| |
| """ |
| device = self._find_device(address) |
| if not device: |
| logging.error('Device not found') |
| return False |
| if self._is_connected(device): |
| logging.info('Device is already connected') |
| return True |
| device.Connect() |
| return self._is_connected(device) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def device_is_connected(self, address): |
| """Checks if a device is connected. |
| |
| @param address: Address of the device to connect. |
| |
| @returns: True if device is connected. False otherwise. |
| |
| """ |
| device = self._find_device(address) |
| if not device: |
| logging.error('Device not found') |
| return False |
| return self._is_connected(device) |
| |
| |
| @xmlrpc_server.dbus_safe(False) |
| def disconnect_device(self, address): |
| """Disconnects a device. |
| |
| Disconnects a device if it is connected. |
| |
| @param address: Address of the device to disconnect. |
| |
| @returns: True on success. False otherwise. |
| |
| """ |
| device = self._find_device(address) |
| if not device: |
| logging.error('Device not found') |
| return False |
| if not self._is_connected(device): |
| logging.info('Device is not connected') |
| return True |
| device.Disconnect() |
| return not self._is_connected(device) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| handler = logging.handlers.SysLogHandler(address='/dev/log') |
| formatter = logging.Formatter( |
| 'bluetooth_device_xmlrpc_server: [%(levelname)s] %(message)s') |
| handler.setFormatter(formatter) |
| logging.getLogger().addHandler(handler) |
| logging.debug('bluetooth_device_xmlrpc_server main...') |
| server = xmlrpc_server.XmlRpcServer( |
| 'localhost', |
| constants.BLUETOOTH_DEVICE_XMLRPC_SERVER_PORT) |
| server.register_delegate(BluetoothDeviceXmlRpcDelegate()) |
| server.run() |