| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import array |
| import btsocket |
| import fcntl |
| import logging |
| import socket |
| import struct |
| |
| |
| # Constants from lib/mgmt.h in BlueZ source |
| MGMT_INDEX_NONE = 0xFFFF |
| |
| MGMT_HDR_SIZE = 6 |
| |
| MGMT_STATUS_SUCCESS = 0x00 |
| MGMT_STATUS_UNKNOWN_COMMAND = 0x01 |
| MGMT_STATUS_NOT_CONNECTED = 0x02 |
| MGMT_STATUS_FAILED = 0x03 |
| MGMT_STATUS_CONNECT_FAILED = 0x04 |
| MGMT_STATUS_AUTH_FAILED = 0x05 |
| MGMT_STATUS_NOT_PAIRED = 0x06 |
| MGMT_STATUS_NO_RESOURCES = 0x07 |
| MGMT_STATUS_TIMEOUT = 0x08 |
| MGMT_STATUS_ALREADY_CONNECTED = 0x09 |
| MGMT_STATUS_BUSY = 0x0a |
| MGMT_STATUS_REJECTED = 0x0b |
| MGMT_STATUS_NOT_SUPPORTED = 0x0c |
| MGMT_STATUS_INVALID_PARAMS = 0x0d |
| MGMT_STATUS_DISCONNECTED = 0x0e |
| MGMT_STATUS_NOT_POWERED = 0x0f |
| MGMT_STATUS_CANCELLED = 0x10 |
| MGMT_STATUS_INVALID_INDEX = 0x11 |
| MGMT_STATUS_RFKILLED = 0x12 |
| |
| MGMT_OP_READ_VERSION = 0x0001 |
| MGMT_OP_READ_COMMANDS = 0x0002 |
| MGMT_OP_READ_INDEX_LIST = 0x0003 |
| MGMT_OP_READ_INFO = 0x0004 |
| MGMT_OP_SET_POWERED = 0x0005 |
| MGMT_OP_SET_DISCOVERABLE = 0x0006 |
| MGMT_OP_SET_CONNECTABLE = 0x0007 |
| MGMT_OP_SET_FAST_CONNECTABLE = 0x0008 |
| MGMT_OP_SET_PAIRABLE = 0x0009 |
| MGMT_OP_SET_LINK_SECURITY = 0x000A |
| MGMT_OP_SET_SSP = 0x000B |
| MGMT_OP_SET_HS = 0x000C |
| MGMT_OP_SET_LE = 0x000D |
| MGMT_OP_SET_DEV_CLASS = 0x000E |
| MGMT_OP_SET_LOCAL_NAME = 0x000F |
| MGMT_OP_ADD_UUID = 0x0010 |
| MGMT_OP_REMOVE_UUID = 0x0011 |
| MGMT_OP_LOAD_LINK_KEYS = 0x0012 |
| MGMT_OP_LOAD_LONG_TERM_KEYS = 0x0013 |
| MGMT_OP_DISCONNECT = 0x0014 |
| MGMT_OP_GET_CONNECTIONS = 0x0015 |
| MGMT_OP_PIN_CODE_REPLY = 0x0016 |
| MGMT_OP_PIN_CODE_NEG_REPLY = 0x0017 |
| MGMT_OP_SET_IO_CAPABILITY = 0x0018 |
| MGMT_OP_PAIR_DEVICE = 0x0019 |
| MGMT_OP_CANCEL_PAIR_DEVICE = 0x001A |
| MGMT_OP_UNPAIR_DEVICE = 0x001B |
| MGMT_OP_USER_CONFIRM_REPLY = 0x001C |
| MGMT_OP_USER_CONFIRM_NEG_REPLY = 0x001D |
| MGMT_OP_USER_PASSKEY_REPLY = 0x001E |
| MGMT_OP_USER_PASSKEY_NEG_REPLY = 0x001F |
| MGMT_OP_READ_LOCAL_OOB_DATA = 0x0020 |
| MGMT_OP_ADD_REMOTE_OOB_DATA = 0x0021 |
| MGMT_OP_REMOVE_REMOTE_OOB_DATA = 0x0022 |
| MGMT_OP_START_DISCOVERY = 0x0023 |
| MGMT_OP_STOP_DISCOVERY = 0x0024 |
| MGMT_OP_CONFIRM_NAME = 0x0025 |
| MGMT_OP_BLOCK_DEVICE = 0x0026 |
| MGMT_OP_UNBLOCK_DEVICE = 0x0027 |
| MGMT_OP_SET_DEVICE_ID = 0x0028 |
| MGMT_OP_SET_ADVERTISING = 0x0029 |
| MGMT_OP_SET_BREDR = 0x002A |
| MGMT_OP_SET_STATIC_ADDRESS = 0x002B |
| MGMT_OP_SET_SCAN_PARAMS = 0x002C |
| MGMT_OP_SET_SECURE_CONN = 0x002D |
| MGMT_OP_SET_DEBUG_KEYS = 0x002E |
| MGMT_OP_SET_PRIVACY = 0x002F |
| MGMT_OP_LOAD_IRKS = 0x0030 |
| MGMT_OP_GET_CONN_INFO = 0x0031 |
| MGMT_OP_GET_CLOCK_INFO = 0x0032 |
| MGMT_OP_ADD_DEVICE = 0x0033 |
| MGMT_OP_REMOVE_DEVICE = 0x0034 |
| MGMT_OP_LOAD_CONN_PARAM = 0x0035 |
| MGMT_OP_READ_UNCONF_INDEX_LIST = 0x0036 |
| MGMT_OP_READ_CONFIG_INFO = 0x0037 |
| MGMT_OP_SET_EXTERNAL_CONFIG = 0x0038 |
| MGMT_OP_SET_PUBLIC_ADDRESS = 0x0039 |
| |
| MGMT_EV_CMD_COMPLETE = 0x0001 |
| MGMT_EV_CMD_STATUS = 0x0002 |
| MGMT_EV_CONTROLLER_ERROR = 0x0003 |
| MGMT_EV_INDEX_ADDED = 0x0004 |
| MGMT_EV_INDEX_REMOVED = 0x0005 |
| MGMT_EV_NEW_SETTINGS = 0x0006 |
| MGMT_EV_CLASS_OF_DEV_CHANGED = 0x0007 |
| MGMT_EV_LOCAL_NAME_CHANGED = 0x0008 |
| MGMT_EV_NEW_LINK_KEY = 0x0009 |
| MGMT_EV_NEW_LONG_TERM_KEY = 0x000A |
| MGMT_EV_DEVICE_CONNECTED = 0x000B |
| MGMT_EV_DEVICE_DISCONNECTED = 0x000C |
| MGMT_EV_CONNECT_FAILED = 0x000D |
| MGMT_EV_PIN_CODE_REQUEST = 0x000E |
| MGMT_EV_USER_CONFIRM_REQUEST = 0x000F |
| MGMT_EV_USER_PASSKEY_REQUEST = 0x0010 |
| MGMT_EV_AUTH_FAILED = 0x0011 |
| MGMT_EV_DEVICE_FOUND = 0x0012 |
| MGMT_EV_DISCOVERING = 0x0013 |
| MGMT_EV_DEVICE_BLOCKED = 0x0014 |
| MGMT_EV_DEVICE_UNBLOCKED = 0x0015 |
| MGMT_EV_DEVICE_UNPAIRED = 0x0016 |
| MGMT_EV_PASSKEY_NOTIFY = 0x0017 |
| MGMT_EV_NEW_IRK = 0x0018 |
| MGMT_EV_NEW_CSRK = 0x0019 |
| MGMT_EV_DEVICE_ADDED = 0x001a |
| MGMT_EV_DEVICE_REMOVED = 0x001b |
| MGMT_EV_NEW_CONN_PARAM = 0x001c |
| MGMT_EV_UNCONF_INDEX_ADDED = 0x001d |
| MGMT_EV_UNCONF_INDEX_REMOVED = 0x001e |
| MGMT_EV_NEW_CONFIG_OPTIONS = 0x001f |
| |
| # Settings returned by MGMT_OP_READ_INFO |
| MGMT_SETTING_POWERED = 0x00000001 |
| MGMT_SETTING_CONNECTABLE = 0x00000002 |
| MGMT_SETTING_FAST_CONNECTABLE = 0x00000004 |
| MGMT_SETTING_DISCOVERABLE = 0x00000008 |
| MGMT_SETTING_PAIRABLE = 0x00000010 |
| MGMT_SETTING_LINK_SECURITY = 0x00000020 |
| MGMT_SETTING_SSP = 0x00000040 |
| MGMT_SETTING_BREDR = 0x00000080 |
| MGMT_SETTING_HS = 0x00000100 |
| MGMT_SETTING_LE = 0x00000200 |
| MGMT_SETTING_ADVERTISING = 0x00000400 |
| MGMT_SETTING_SECURE_CONNECTIONS = 0x00000800 |
| MGMT_SETTING_DEBUG_KEYS = 0x00001000 |
| MGMT_SETTING_PRIVACY = 0x00002000 |
| MGMT_SETTING_CONTROLLER_CONFIG = 0x00004000 |
| |
| # Options returned by MGMT_OP_READ_CONFIG_INFO |
| MGMT_OPTION_EXTERNAL_CONFIG = 0x00000001 |
| MGMT_OPTION_PUBLIC_ADDRESS = 0x00000002 |
| |
| # Disconnect reason returned in MGMT_EV_DEVICE_DISCONNECTED |
| MGMT_DEV_DISCONN_UNKNOWN = 0x00 |
| MGMT_DEV_DISCONN_TIMEOUT = 0x01 |
| MGMT_DEV_DISCONN_LOCAL_HOST = 0x02 |
| MGMT_DEV_DISCONN_REMOTE = 0x03 |
| |
| # Flags returned in MGMT_EV_DEVICE_FOUND |
| MGMT_DEV_FOUND_CONFIRM_NAME = 0x01 |
| MGMT_DEV_FOUND_LEGACY_PAIRING = 0x02 |
| |
| |
| # EIR Data field types |
| EIR_FLAGS = 0x01 |
| EIR_UUID16_SOME = 0x02 |
| EIR_UUID16_ALL = 0x03 |
| EIR_UUID32_SOME = 0x04 |
| EIR_UUID32_ALL = 0x05 |
| EIR_UUID128_SOME = 0x06 |
| EIR_UUID128_ALL = 0x07 |
| EIR_NAME_SHORT = 0x08 |
| EIR_NAME_COMPLETE = 0x09 |
| EIR_TX_POWER = 0x0A |
| EIR_CLASS_OF_DEV = 0x0D |
| EIR_SSP_HASH = 0x0E |
| EIR_SSP_RANDOMIZER = 0x0F |
| EIR_DEVICE_ID = 0x10 |
| EIR_GAP_APPEARANCE = 0x19 |
| |
| |
| # Derived from lib/hci.h |
| HCIGETDEVLIST = 0x800448d2 |
| HCIGETDEVINFO = 0x800448d3 |
| |
| HCI_UP = 1 << 0 |
| HCI_INIT = 1 << 1 |
| HCI_RUNNING = 1 << 2 |
| HCI_PSCAN = 1 << 3 |
| HCI_ISCAN = 1 << 4 |
| HCI_AUTH = 1 << 5 |
| HCI_ENCRYPT = 1 << 6 |
| HCI_INQUIRY = 1 << 7 |
| HCI_RAW = 1 << 8 |
| |
| |
| def parse_eir(eirdata): |
| """Parse Bluetooth Extended Inquiry Result (EIR) data structuree. |
| |
| @param eirdata: Encoded eir data structure. |
| |
| @return Dictionary equivalent to the expanded structure keyed by EIR_* |
| fields, with any data members parsed to useful formats. |
| |
| """ |
| fields = {} |
| pos = 0 |
| while pos < len(eirdata): |
| # Byte at the current position is the field length, which should be |
| # zero at the end of the structure. |
| (field_len,) = struct.unpack('B', buffer(eirdata, pos, 1)) |
| if field_len == 0: |
| break |
| # Next byte is the field type, and the rest of the field is the data. |
| # Note that the length field doesn't include itself so that's why the |
| # offsets and lengths look a little odd. |
| (field_type,) = struct.unpack('B', buffer(eirdata, pos + 1, 1)) |
| data = eirdata[pos+2:pos+field_len+1] |
| pos += field_len + 1 |
| # Parse the individual fields to make the data meaningful. |
| if field_type == EIR_NAME_SHORT or field_type == EIR_NAME_COMPLETE: |
| data = data.rstrip('\0') |
| # Place in the dictionary keyed by type. |
| fields[field_type] = data |
| |
| return fields |
| |
| |
| |
| class BluetoothSocketError(Exception): |
| """Error raised for general issues with BluetoothSocket.""" |
| pass |
| |
| class BluetoothInvalidPacketError(Exception): |
| """Error raised when an invalid packet is received from the socket.""" |
| pass |
| |
| class BluetoothControllerError(Exception): |
| """Error raised when the Controller Error event is received.""" |
| pass |
| |
| |
| class BluetoothSocket(btsocket.socket): |
| """Bluetooth Socket. |
| |
| BluetoothSocket wraps the btsocket.socket() class, and thus the system |
| socket.socket() class, to implement the necessary send and receive methods |
| for the HCI Control and Monitor protocols (aka mgmt_ops) of the |
| Linux Kernel. |
| |
| Instantiate either BluetoothControlSocket or BluetoothRawSocket rather |
| than this class directly. |
| |
| See bluez/doc/mgmt_api.txt for details. |
| |
| """ |
| |
| def __init__(self): |
| super(BluetoothSocket, self).__init__(family=btsocket.AF_BLUETOOTH, |
| type=socket.SOCK_RAW, |
| proto=btsocket.BTPROTO_HCI) |
| self.events = [] |
| |
| |
| def send_command(self, code, index, data=''): |
| """Send a command to the socket. |
| |
| To send a command, wait for the reply event, and parse it use |
| send_command_and_wait() instead. |
| |
| @param code: Command Code. |
| @param index: Controller index, may be MGMT_INDEX_NONE. |
| @param data: Parameters as bytearray or str (optional). |
| |
| """ |
| # Send the command to the kernel |
| msg = struct.pack('<HHH', code, index, len(data)) + data |
| |
| length = self.send(msg) |
| if length != len(msg): |
| raise BluetoothSocketError('Short write on socket') |
| |
| |
| def recv_event(self): |
| """Receive a single event from the socket. |
| |
| The event data is not parsed; in the case of command complete events |
| this means it includes both the data for the event and the response |
| for the command. |
| |
| Use settimeout() to set whether this method will block if there is no |
| data, return immediately or wait for a specific length of time before |
| timing out and raising TimeoutError. |
| |
| @return tuple of (event, index, data) |
| |
| """ |
| # Read the message from the socket |
| hdr = bytearray(MGMT_HDR_SIZE) |
| data = bytearray(512) |
| try: |
| (nbytes, ancdata, msg_flags, address) = self.recvmsg_into( |
| (hdr, data)) |
| except btsocket.timeout as e: |
| raise BluetoothSocketError('Error receiving event: %s' % e) |
| if nbytes < MGMT_HDR_SIZE: |
| raise BluetoothInvalidPacketError('Packet shorter than header') |
| |
| # Parse the header |
| (event, index, length) = struct.unpack_from('<HHH', buffer(hdr)) |
| if nbytes < MGMT_HDR_SIZE + length: |
| raise BluetoothInvalidPacketError('Packet shorter than length') |
| |
| return (event, index, data[:length]) |
| |
| |
| def send_command_and_wait(self, cmd_code, cmd_index, cmd_data='', |
| expected_length=None): |
| """Send a command to the socket and wait for the reply. |
| |
| Additional events are appended to the events list of the socket object. |
| |
| @param cmd_code: Command Code. |
| @param cmd_index: Controller index, may be btsocket.HCI_DEV_NONE. |
| @param cmd_data: Parameters as bytearray or str. |
| @param expected_length: May be set to verify the length of the data. |
| |
| Use settimeout() to set whether this method will block if there is no |
| reply, return immediately or wait for a specific length of time before |
| timing out and raising TimeoutError. |
| |
| @return tuple of (status, data) |
| |
| """ |
| self.send_command(cmd_code, cmd_index, cmd_data) |
| |
| while True: |
| (event, index, data) = self.recv_event() |
| |
| if event == MGMT_EV_CMD_COMPLETE: |
| if index != cmd_index: |
| raise BluetoothInvalidPacketError( |
| ('Response for wrong controller index received: ' + |
| '0x%04d (expected 0x%04d)' % (index, cmd_index))) |
| if len(data) < 3: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect command complete event data length: ' + |
| '%d (expected at least 3)' % len(data))) |
| |
| (code, status) = struct.unpack_from('<HB', buffer(data, 0, 3)) |
| logging.debug('[0x%04x] command 0x%04x complete: 0x%02x', |
| index, code, status) |
| |
| if code != cmd_code: |
| raise BluetoothInvalidPacketError( |
| ('Response for wrong command code received: ' + |
| '0x%04d (expected 0x%04d)' % (code, cmd_code))) |
| |
| response_length = len(data) - 3 |
| if (expected_length is not None and |
| response_length != expected_length): |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected %d)' % (response_length, |
| expected_length))) |
| |
| return (status, data[3:]) |
| |
| elif event == MGMT_EV_CMD_STATUS: |
| if index != cmd_index: |
| raise BluetoothInvalidPacketError( |
| ('Response for wrong controller index received: ' + |
| '0x%04d (expected 0x%04d)' % (index, cmd_index))) |
| if len(data) != 3: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect command status event data length: ' + |
| '%d (expected 3)' % len(data))) |
| |
| (code, status) = struct.unpack_from('<HB', buffer(data, 0, 3)) |
| logging.debug('[0x%04x] command 0x%02x status: 0x%02x', |
| index, code, status) |
| |
| if code != cmd_code: |
| raise BluetoothInvalidPacketError( |
| ('Response for wrong command code received: ' + |
| '0x%04d (expected 0x%04d)' % (code, cmd_code))) |
| |
| return (status, None) |
| |
| elif event == MGMT_EV_CONTROLLER_ERROR: |
| if len(data) != 1: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect controller error event data length: ' + |
| '%d (expected 1)' % len(data))) |
| |
| (error_code) = struct.unpack_from('<B', buffer(data, 0, 1)) |
| |
| raise BluetoothControllerError('Controller error: %d' % |
| error_code) |
| |
| else: |
| logging.debug('[0x%04x] event 0x%02x length: %d', |
| index, event, len(data)) |
| self.events.append((event, index, data)) |
| |
| |
| def wait_for_events(self, index, events): |
| """Wait for and return the first of a set of events specified. |
| |
| @param index: Controller index of event, may be btsocket.HCI_DEV_NONE. |
| @param events: List of event codes to wait for. |
| |
| Use settimeout() to set whether this method will block if there is no |
| event received, return immediately or wait for a specific length of |
| time before timing out and raising TimeoutError. |
| |
| @return Tuple of (event, data) |
| |
| """ |
| while True: |
| for idx, (event, event_index, data) in enumerate(self.events): |
| if event_index == index and event in events: |
| self.events.pop(idx) |
| return (event, data) |
| |
| (event, event_index, data) = self.recv_event() |
| if event_index == index and event in events: |
| return (event, data) |
| elif event == MGMT_EV_CMD_COMPLETE: |
| if len(data) < 3: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect command complete event data length: ' + |
| '%d (expected at least 3)' % len(data))) |
| |
| (code, status) = struct.unpack_from('<HB', buffer(data, 0, 3)) |
| logging.debug('[0x%04x] command 0x%04x complete: 0x%02x ' |
| '(Ignored)', index, code, status) |
| |
| elif event == MGMT_EV_CMD_STATUS: |
| if len(data) != 3: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect command status event data length: ' + |
| '%d (expected 3)' % len(data))) |
| |
| (code, status) = struct.unpack_from('<HB', buffer(data, 0, 3)) |
| logging.debug('[0x%04x] command 0x%02x status: 0x%02x ' |
| '(Ignored)', index, code, status) |
| |
| elif event == MGMT_EV_CONTROLLER_ERROR: |
| if len(data) != 1: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect controller error event data length: ' + |
| '%d (expected 1)' % len(data))) |
| |
| (error_code) = struct.unpack_from('<B', buffer(data, 0, 1)) |
| logging.debug('[0x%04x] controller error: %d (Ignored)', |
| index, error_code) |
| |
| else: |
| self.events.append((event, index, data)) |
| |
| |
| class BluetoothControlSocket(BluetoothSocket): |
| """Bluetooth Control Socket. |
| |
| BluetoothControlSocket provides convenient methods mapping to each mgmt_ops |
| command that send an appropriately formatted command and parse the response. |
| |
| """ |
| |
| DEFAULT_TIMEOUT = 15 |
| |
| def __init__(self): |
| super(BluetoothControlSocket, self).__init__() |
| self.bind((btsocket.HCI_DEV_NONE, btsocket.HCI_CHANNEL_CONTROL)) |
| self.settimeout(self.DEFAULT_TIMEOUT) |
| |
| # Certain features will depend on the management version and revision, |
| # so check those now. |
| (version, revision) = self.read_version() |
| logging.debug('MGMT API %d.%d', version, revision) |
| self._kernel_confirms_name = ( |
| (version > 1) or ((version == 1) and (revision >= 5))) |
| |
| def read_version(self): |
| """Read the version of the management interface. |
| |
| @return tuple (version, revision) on success, None on failure. |
| |
| """ |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_READ_VERSION, |
| MGMT_INDEX_NONE, |
| expected_length=3) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (version, revision) = struct.unpack_from('<BH', buffer(data)) |
| return (version, revision) |
| |
| |
| def read_supported_commands(self): |
| """Read the supported management commands and events. |
| |
| @return tuple (commands, events) on success, None on failure. |
| |
| """ |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_READ_COMMANDS, |
| MGMT_INDEX_NONE) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| if len(data) < 4: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected at least 4)' % len(data))) |
| |
| (ncommands, nevents) = struct.unpack_from('<HH', buffer(data, 0, 4)) |
| offset = 4 |
| expected_length = offset + (ncommands * 2) + (nevents * 2) |
| if len(data) != expected_length: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected %d)' % (len(data), expected_length))) |
| |
| commands = [] |
| while len(commands) < ncommands: |
| commands.extend(struct.unpack_from('<H', buffer(data, offset, 2))) |
| offset += 2 |
| |
| events = [] |
| while len(events) < nevents: |
| events.extend(struct.unpack_from('<H', buffer(data, offset, 2))) |
| offset += 2 |
| |
| return (commands, events) |
| |
| |
| def read_index_list(self): |
| """Read the list of currently known controllers. |
| |
| @return array of controller indexes on success, None on failure. |
| |
| """ |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_READ_INDEX_LIST, |
| MGMT_INDEX_NONE) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| if len(data) < 2: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected at least 2)' % len(data))) |
| |
| (nindexes,) = struct.unpack_from('<H', buffer(data, 0, 2)) |
| offset = 2 |
| expected_length = offset + (nindexes * 2) |
| if len(data) != expected_length: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected %d)' % (len(data), expected_length))) |
| |
| indexes = [] |
| while len(indexes) < nindexes: |
| indexes.extend(struct.unpack_from('<H', buffer(data, offset, 2))) |
| offset += 2 |
| |
| return indexes |
| |
| |
| def read_info(self, index): |
| """Read the state and basic information of a controller. |
| |
| Address is returned as a string in upper-case hex to match the |
| BlueZ property. |
| |
| @param index: Controller index. |
| |
| @return tuple (address, bluetooth_version, manufacturer, |
| supported_settings, current_settings, |
| class_of_device, name, short_name) on success, |
| None on failure. |
| |
| """ |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_READ_INFO, |
| index, |
| expected_length=280) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (address, bluetooth_version, manufacturer, |
| supported_settings, current_settings, |
| class_of_device_lo, class_of_device_mid, class_of_device_hi, |
| name, short_name) = struct.unpack_from( |
| '<6sBHLL3B249s11s', |
| buffer(data)) |
| |
| return ( |
| ':'.join('%02X' % x |
| for x in reversed(struct.unpack('6B', address))), |
| bluetooth_version, |
| manufacturer, |
| supported_settings, |
| current_settings, |
| (class_of_device_lo |(class_of_device_mid << 8) | |
| (class_of_device_hi << 16)), |
| name.rstrip('\0'), |
| short_name.rstrip('\0')) |
| |
| |
| def set_powered(self, index, powered): |
| """Set the powered state of a controller. |
| |
| @param index: Controller index. |
| @param powered: Whether controller radio should be powered. |
| |
| @return New controller settings on success, None on failure. |
| |
| """ |
| msg_data = struct.pack('<B', bool(powered)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_POWERED, |
| index, |
| msg_data, |
| expected_length=4) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_discoverable(self, index, discoverable, timeout=0): |
| """Set the discoverable state of a controller. |
| |
| @param index: Controller index. |
| @param discoverable: Whether controller should be discoverable. |
| @param timeout: Timeout in seconds before disabling discovery again, |
| ignored when discoverable is False, must not be zero when |
| discoverable is True. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<BH', bool(discoverable), timeout) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_DISCOVERABLE, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not discoverable: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_connectable(self, index, connectable): |
| """Set the connectable state of a controller. |
| |
| @param index: Controller index. |
| @param connectable: Whether controller should be connectable. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(connectable)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_CONNECTABLE, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not connectable: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_fast_connectable(self, index, connectable): |
| """Set the fast connectable state of a controller. |
| |
| Fast Connectable is a state where page scan parameters are set to favor |
| faster connect times at the expense of higher power consumption. |
| |
| Unlike most other set_* commands, this may only be used when the |
| controller is powered. |
| |
| @param index: Controller index. |
| @param connectable: Whether controller should be fast connectable. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False or the controller is |
| powered down, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(connectable)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_FAST_CONNECTABLE, |
| index, |
| msg_data) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not connectable: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| # This is documented as returning current settings, but doesn't in |
| # our kernel version (probably a bug), so if no data is returned, |
| # pretend that was success. |
| if len(data) == 0: |
| return 0 |
| elif len(data) != 4: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect length of data for response: ' + |
| '%d (expected 4)' % len(data))) |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_pairable(self, index, pairable): |
| """Set the pairable state of a controller. |
| |
| @param index: Controller index. |
| @param pairable: Whether controller should be pairable. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(pairable)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_PAIRABLE, |
| index, |
| msg_data, |
| expected_length=4) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_link_security(self, index, link_security): |
| """Set the link security state of a controller. |
| |
| Toggles the use of link level security (aka Security Mode 3) for a |
| controller. |
| |
| @param index: Controller index. |
| @param link_security: Whether controller should be link_security. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(link_security)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_LINK_SECURITY, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not link_security: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_ssp(self, index, ssp): |
| """Set the whether a controller supports Simple Secure Pairing. |
| |
| @param index: Controller index. |
| @param ssp: Whether controller should support SSP. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(ssp)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_SSP, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not ssp: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_hs(self, index, hs): |
| """Set the whether a controller supports Bluetooth High Speed. |
| |
| @param index: Controller index. |
| @param hs: Whether controller should support High Speed. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(hs)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_HS, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not hs: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_le(self, index, le): |
| """Set the whether a controller supports Bluetooth Low Energy. |
| |
| @param index: Controller index. |
| @param le: Whether controller should support Low Energy. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(le)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_LE, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not le: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_device_class(self, index, major, minor): |
| """Set the device class of the controller. |
| |
| Consult the Bluetooth Baseband Assigned Numbers specification for valid |
| values, in general both values are bit fields defined by that |
| specification. |
| |
| If the device class is set while the controller is powered off, 0 will |
| be returned, but the new class will be set by the host subsystem after |
| the controller is powered on. |
| |
| @param index: Controller index. |
| @param major: Major device class. |
| @param minor: Minor device class. |
| |
| @return New three-octet device class on success, None on failure. |
| |
| """ |
| msg_data = struct.pack('<BB', major, minor) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_DEV_CLASS, |
| index, |
| msg_data, |
| expected_length=3) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (class_of_device_lo, class_of_device_mid, |
| class_of_device_hi) = struct.unpack_from('<3B', buffer(data)) |
| return (class_of_device_lo |(class_of_device_mid << 8) | |
| (class_of_device_hi << 16)) |
| |
| |
| def set_local_name(self, index, name, short_name): |
| """Set the local name of the controller. |
| |
| @param index: Controller index. |
| @param name: Full length name, up to 248 characters. |
| @param short_name: Short name, up to 10 characters. |
| |
| @return Tuple of (name, short_name) on success, None on failure. |
| |
| """ |
| # Truncate the provided parameters and then zero-pad using struct |
| # so we pass a fixed-length null-terminated string to the kernel. |
| msg_data = struct.pack('<249s11s', name[:248], short_name[:10]) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_LOCAL_NAME, |
| index, |
| msg_data, |
| expected_length=260) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (name, short_name) = struct.unpack_from('<249s11s', buffer(data)) |
| return (name.rstrip('\0'), short_name.rstrip('\0')) |
| |
| |
| def start_discovery(self, index, address_type): |
| """Start discovering remote devices. |
| |
| Call get_discovered_devices() to retrieve the list of devices found. |
| |
| @param index: Controller index. |
| @param address_type: Address types to discover. |
| |
| @return Address types discovery was started for on success, |
| None on failure. |
| |
| """ |
| msg_data = struct.pack('<B', address_type) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_START_DISCOVERY, |
| index, |
| msg_data, |
| expected_length=1) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (address_type,) = struct.unpack_from('<B', buffer(data)) |
| return address_type |
| |
| |
| def stop_discovery(self, index, address_type): |
| """Stop discovering remote devices. |
| |
| There is usually no need to call this method explicitly as discovery |
| is automatically stopped once it has iterated through the necessary |
| channels. |
| |
| @param index: Controller index. |
| @param address_type: Address types to stop discovering. |
| |
| @return Address types discovery was stopped for on success, |
| None on failure. |
| |
| """ |
| msg_data = struct.pack('<B', address_type) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_STOP_DISCOVERY, |
| index, |
| msg_data, |
| expected_length=1) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (address_type,) = struct.unpack_from('<B', buffer(data)) |
| return address_type |
| |
| |
| def get_discovered_devices(self, index): |
| """Return list of discovered remote devices. |
| |
| This method may be called any time after start_discovery() and will |
| wait until the full list of devices has been returned, there is usually |
| no need to call stop_discovery() explicitly. |
| |
| Use settimeout() to set whether this method will block if there are no |
| events, return immediately or wait for a specific length of time before |
| timing out and raising TimeoutError. |
| |
| @param index: Controller index. |
| |
| @return List of devices found as tuples with the format |
| (address, address_type, rssi, flags, eirdata) |
| |
| """ |
| devices = [] |
| discovering = True |
| while discovering: |
| (event, data) = self.wait_for_events( |
| index, |
| ( MGMT_EV_DISCOVERING, MGMT_EV_DEVICE_FOUND )) |
| |
| if event == MGMT_EV_DISCOVERING: |
| if len(data) != 2: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect discovering event data length: ' + |
| '%d (expected 2)' % len(data))) |
| |
| (address_type, |
| discovering) = struct.unpack_from('<BB', buffer(data)) |
| |
| elif event == MGMT_EV_DEVICE_FOUND: |
| if len(data) < 14: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect device found event data length: ' + |
| '%d (expected at least 14)' % len(data))) |
| |
| (address, address_type, rssi, |
| flags, eir_len) = struct.unpack_from('<6sBbLH', |
| buffer(data, 0, 14)) |
| |
| if len(data) != 14 + eir_len: |
| raise BluetoothInvalidPacketError( |
| ('Incorrect device found event data length: ' + |
| '%d (expected %d)' % (len(data), 14 + eir_len))) |
| |
| devices.append(( |
| ':'.join('%02X' % x |
| for x in reversed( |
| struct.unpack('6B', address))), |
| address_type, |
| rssi, |
| flags, |
| bytes(data[14:]) |
| )) |
| |
| # The kernel might want us to confirm whether or not we |
| # know the name of the device. We don't really care whether |
| # or not this works, we just have to do it to get the EIR |
| # Request. |
| if flags & MGMT_DEV_FOUND_CONFIRM_NAME: |
| msg_data = struct.pack('<6sBB', |
| address, address_type, False) |
| if self._kernel_confirms_name: |
| self.send_command_and_wait( |
| MGMT_OP_CONFIRM_NAME, |
| index, |
| msg_data) |
| else: |
| self.send_command( |
| MGMT_OP_CONFIRM_NAME, |
| index, |
| msg_data) |
| |
| |
| return devices |
| |
| |
| def set_advertising(self, index, advertising): |
| """Set the whether a controller is advertising via LE. |
| |
| @param index: Controller index. |
| @param advertising: Whether controller should advertise via LE. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(advertising)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_ADVERTISING, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not advertising: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def set_bredr(self, index, bredr): |
| """Set the whether a controller supports Bluetooth BR/EDR (classic). |
| |
| @param index: Controller index. |
| @param bredr: Whether controller should support BR/EDR. |
| |
| @return New controller settings on success, 0 if the feature is not |
| supported and the parameter was False, None otherwise. |
| |
| """ |
| msg_data = struct.pack('<B', bool(bredr)) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_SET_BREDR, |
| index, |
| msg_data, |
| expected_length=4) |
| if status == MGMT_STATUS_NOT_SUPPORTED and not bredr: |
| return 0 |
| elif status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (current_settings, ) = struct.unpack_from('<L', buffer(data)) |
| return current_settings |
| |
| |
| def add_device(self, index, address, address_type, action): |
| """Add a device to the action list. |
| |
| @param index: Controller index. |
| @param address: Address of the device to add. |
| @param address_type: Type of device in @address. |
| @param action: Action to take. |
| |
| @return Tuple of ( address, address_type ) on success, |
| None on failure. |
| |
| """ |
| msg_data = struct.pack('<6sBB', address, address_type, action) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_ADD_DEVICE, |
| index, |
| msg_data, |
| expected_length=7) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (address, address_type,) = struct.unpack_from('<6sB', buffer(data)) |
| return (address, address_type) |
| |
| |
| def remove_device(self, index, address, address_type): |
| """Remove a device from the action list. |
| |
| @param index: Controller index. |
| @param address: Address of the device to remove. |
| @param address_type: Type of device in @address. |
| |
| @return Tuple of ( address, address_type ) on success, |
| None on failure. |
| |
| """ |
| msg_data = struct.pack('<6sB', address, address_type) |
| (status, data) = self.send_command_and_wait( |
| MGMT_OP_REMOVE_DEVICE, |
| index, |
| msg_data, |
| expected_length=7) |
| if status != MGMT_STATUS_SUCCESS: |
| return None |
| |
| (address, address_type,) = struct.unpack_from('<6sB', buffer(data)) |
| return (address, address_type) |
| |
| |
| class BluetoothRawSocket(BluetoothSocket): |
| """Bluetooth Raw HCI Socket. |
| |
| BluetoothRawSocket is a subclass of BluetoothSocket representing raw access |
| to the HCI controller and providing commands corresponding to ioctls that |
| can be made on that kind of socket. |
| |
| """ |
| |
| def get_dev_info(self, index): |
| """Read HCI device information. |
| |
| This method uses the same underlying ioctl as the hciconfig tool. |
| |
| Address is returned as a string in upper-case hex to match the |
| BlueZ property. |
| |
| @param index: Device index. |
| |
| @return tuple (index, name, address, flags, device_type, bus_type, |
| features, pkt_type, link_policy, link_mode, |
| acl_mtu, acl_pkts, sco_mtu, sco_pkts, |
| err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx, |
| sco_tx, sco_rx, byte_rx, byte_tx) on success, |
| None on failure. |
| |
| """ |
| buf = array.array('B', [0] * 96) |
| fcntl.ioctl(self.fileno(), HCIGETDEVINFO, buf, 1) |
| |
| ( dev_id, name, address, flags, dev_type, features, pkt_type, |
| link_policy, link_mode, acl_mtu, acl_pkts, sco_mtu, sco_pkts, |
| err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx, sco_tx, sco_rx, |
| byte_rx, byte_tx ) = struct.unpack_from( |
| '@H8s6sIBQIIIHHHHIIIIIIIIII', buf) |
| |
| return ( |
| dev_id, |
| name.rstrip('\0'), |
| ':'.join('%02X' % x |
| for x in reversed(struct.unpack('6B', address))), |
| flags, |
| (dev_type & 0x30) >> 4, |
| dev_type & 0x0f, |
| features, |
| pkt_type, |
| link_policy, |
| link_mode, |
| acl_mtu, |
| acl_pkts, |
| sco_mtu, |
| sco_pkts, |
| err_rx, |
| err_tx, |
| cmd_tx, |
| evt_rx, |
| acl_tx, |
| acl_rx, |
| sco_tx, |
| sco_rx, |
| byte_rx, |
| byte_tx) |