| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import cmd |
| import dbus |
| import dbus.exceptions |
| import dbus.mainloop.glib |
| import gobject |
| import threading |
| |
| from functools import wraps |
| |
| |
| DBUS_ERROR = 'org.freedesktop.DBus.Error' |
| NEARD_PATH = '/org/neard/' |
| PROMPT = 'NFC> ' |
| |
| class NfcClientException(Exception): |
| """Exception class for exceptions thrown by NfcClient.""" |
| |
| |
| def print_message(message, newlines=2): |
| """ |
| Prints the given message with extra wrapping newline characters. |
| |
| @param message: Message to print. |
| @param newlines: Integer, specifying the number of '\n' characters that |
| should be padded at the beginning and end of |message| before |
| being passed to "print". |
| |
| """ |
| padding = newlines * '\n' |
| message = padding + message + padding |
| print message |
| |
| |
| def handle_errors(func): |
| """ |
| Decorator for handling exceptions that are commonly raised by many of the |
| methods in NfcClient. |
| |
| @param func: The function this decorator is wrapping. |
| |
| """ |
| @wraps(func) |
| def _error_handler(*args): |
| try: |
| return func(*args) |
| except dbus.exceptions.DBusException as e: |
| if e.get_dbus_name() == DBUS_ERROR + '.ServiceUnknown': |
| print_message('neard may have crashed or disappeared. ' |
| 'Check if neard is running and run "initialize" ' |
| 'from this shell.') |
| return |
| if e.get_dbus_name() == DBUS_ERROR + '.UnknownObject': |
| print_message('Could not find object.') |
| return |
| print_message(str(e)) |
| except Exception as e: |
| print_message(str(e)) |
| return _error_handler |
| |
| |
| class NfcClient(object): |
| """ |
| neard D-Bus client |
| |
| """ |
| NEARD_SERVICE_NAME = 'org.neard' |
| IMANAGER = NEARD_SERVICE_NAME + '.Manager' |
| IADAPTER = NEARD_SERVICE_NAME + '.Adapter' |
| ITAG = NEARD_SERVICE_NAME + '.Tag' |
| IRECORD = NEARD_SERVICE_NAME + '.Record' |
| IDEVICE = NEARD_SERVICE_NAME + '.Device' |
| |
| def __init__(self): |
| self._mainloop = None |
| self._mainloop_thread = None |
| self._adapters = {} |
| self._adapter_property_handler_matches = {} |
| |
| def begin(self): |
| """ |
| Starts the D-Bus client. |
| |
| """ |
| # Here we run a GLib MainLoop in its own thread, so that the client can |
| # listen to D-Bus signals while keeping the console interactive. |
| self._dbusmainloop = dbus.mainloop.glib.DBusGMainLoop( |
| set_as_default=True) |
| dbus.mainloop.glib.threads_init() |
| gobject.threads_init() |
| |
| def _mainloop_thread_func(): |
| self._mainloop = gobject.MainLoop() |
| context = self._mainloop.get_context() |
| self._run_loop = True |
| while self._run_loop: |
| context.iteration(True) |
| self._mainloop_thread = threading.Thread(None, _mainloop_thread_func) |
| self._mainloop_thread.start() |
| |
| self._bus = dbus.SystemBus() |
| self.setup_manager() |
| |
| def end(self): |
| """ |
| Stops the D-Bus client. |
| |
| """ |
| self._run_loop = False |
| self._mainloop.quit() |
| self._mainloop_thread.join() |
| |
| def restart(self): |
| """Reinitializes the NFC client.""" |
| self.setup_manager() |
| |
| @handle_errors |
| def _get_manager_proxy(self): |
| return dbus.Interface( |
| self._bus.get_object(self.NEARD_SERVICE_NAME, '/'), |
| self.IMANAGER) |
| |
| @handle_errors |
| def _get_adapter_proxy(self, adapter): |
| return dbus.Interface( |
| self._bus.get_object(self.NEARD_SERVICE_NAME, adapter), |
| self.IADAPTER) |
| |
| def _get_cached_adapter_proxy(self, adapter): |
| adapter_proxy = self._adapters.get(adapter, None) |
| if not adapter_proxy: |
| raise NfcClientException('Adapter "' + adapter + '" not found.') |
| return adapter_proxy |
| |
| |
| @handle_errors |
| def _get_tag_proxy(self, tag): |
| return dbus.Interface( |
| self._bus.get_object(self.NEARD_SERVICE_NAME, tag), |
| self.ITAG) |
| |
| @handle_errors |
| def _get_device_proxy(self, device): |
| return dbus.Interface( |
| self._bus.get_object(self.NEARD_SERVICE_NAME, device), |
| self.IDEVICE) |
| |
| @handle_errors |
| def _get_record_proxy(self, record): |
| return dbus.Interface( |
| self._bus.get_object(self.NEARD_SERVICE_NAME, record), |
| self.IRECORD) |
| |
| @handle_errors |
| def _get_adapter_properties(self, adapter): |
| adapter_proxy = self._get_cached_adapter_proxy(adapter) |
| return adapter_proxy.GetProperties() |
| |
| def _get_adapters(self): |
| props = self._manager.GetProperties() |
| return props.get('Adapters', None) |
| |
| def setup_manager(self): |
| """ |
| Creates a manager proxy and subscribes to adapter signals. This method |
| will also initialize proxies for adapters if any are available. |
| |
| """ |
| # Create the manager proxy. |
| self._adapters.clear() |
| self._manager = self._get_manager_proxy() |
| if not self._manager: |
| print_message('Failed to create a proxy to the Manager interface.') |
| return |
| |
| # Listen to the adapter added and removed signals. |
| self._manager.connect_to_signal( |
| 'AdapterAdded', |
| lambda adapter: self.register_adapter(str(adapter))) |
| self._manager.connect_to_signal( |
| 'AdapterRemoved', |
| lambda adapter: self.unregister_adapter(str(adapter))) |
| |
| # See if there are any adapters and create proxies for each. |
| adapters = self._get_adapters() |
| if adapters: |
| for adapter in adapters: |
| self.register_adapter(adapter) |
| |
| def register_adapter(self, adapter): |
| """ |
| Registers an adapter proxy with the given object path and subscribes to |
| adapter signals. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| |
| """ |
| print_message('Added adapter: ' + adapter) |
| adapter_proxy = self._get_adapter_proxy(adapter) |
| self._adapters[adapter] = adapter_proxy |
| |
| # Tag found/lost currently don't get fired. Monitor property changes |
| # instead. |
| if self._adapter_property_handler_matches.get(adapter, None) is None: |
| self._adapter_property_handler_matches[adapter] = ( |
| adapter_proxy.connect_to_signal( |
| 'PropertyChanged', |
| (lambda name, value: |
| self._adapter_property_changed_signal( |
| adapter, name, value)))) |
| |
| def unregister_adapter(self, adapter): |
| """ |
| Removes the adapter proxy for the given object path from the internal |
| cache of adapters. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| |
| """ |
| print_message('Removed adapter: ' + adapter) |
| match = self._adapter_property_handler_matches.get(adapter, None) |
| if match is not None: |
| match.remove() |
| self._adapter_property_handler_matches.pop(adapter) |
| self._adapters.pop(adapter) |
| |
| def _adapter_property_changed_signal(self, adapter, name, value): |
| if name == 'Tags' or name == 'Devices': |
| print_message('Found ' + name + ': ' + |
| self._dbus_array_to_string(value)) |
| |
| @handle_errors |
| def show_adapters(self): |
| """ |
| Prints the D-Bus object paths of all adapters that are available. |
| |
| """ |
| adapters = self._get_adapters() |
| if not adapters: |
| print_message('No adapters found.') |
| return |
| for adapter in adapters: |
| print_message(' ' + str(adapter), newlines=0) |
| print |
| |
| def _dbus_array_to_string(self, array): |
| string = '[ ' |
| for value in array: |
| string += ' ' + str(value) + ', ' |
| string += ' ]' |
| return string |
| |
| def print_adapter_status(self, adapter): |
| """ |
| Prints the properties of the given adapter. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| |
| """ |
| props = self._get_adapter_properties(adapter) |
| if not props: |
| return |
| print_message('Status ' + adapter + ': ', newlines=0) |
| for key, value in props.iteritems(): |
| if type(value) == dbus.Array: |
| value = self._dbus_array_to_string(value) |
| else: |
| value = str(value) |
| print_message(' ' + key + ' = ' + value, newlines=0) |
| print |
| |
| @handle_errors |
| def set_powered(self, adapter, powered): |
| """ |
| Enables or disables the adapter. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| @param powered: boolean that dictates whether the adapter will be |
| enabled or disabled. |
| |
| """ |
| adapter_proxy = self._get_cached_adapter_proxy(adapter) |
| if not adapter_proxy: |
| return |
| adapter_proxy.SetProperty('Powered', powered) |
| |
| @handle_errors |
| def start_polling(self, adapter): |
| """ |
| Starts polling for nearby tags and devices in "Initiator" mode. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| |
| """ |
| adapter_proxy = self._get_cached_adapter_proxy(adapter) |
| adapter_proxy.StartPollLoop('Initiator') |
| print_message('Started polling.') |
| |
| @handle_errors |
| def stop_polling(self, adapter): |
| """ |
| Stops polling for nearby tags and devices. |
| |
| @param adapter: string, containing the adapter's D-Bus object path. |
| |
| """ |
| adapter_proxy = self._get_cached_adapter_proxy(adapter) |
| adapter_proxy.StopPollLoop() |
| self._polling_stopped = True |
| print_message('Stopped polling.') |
| |
| @handle_errors |
| def show_tag_data(self, tag): |
| """ |
| Prints the properties of the given tag, as well as the contents of any |
| records associated with it. |
| |
| @param tag: string, containing the tag's D-Bus object path. |
| |
| """ |
| tag_proxy = self._get_tag_proxy(tag) |
| if not tag_proxy: |
| print_message('Tag "' + tag + '" not found.') |
| return |
| props = tag_proxy.GetProperties() |
| print_message('Tag ' + tag + ': ', newlines=1) |
| for key, value in props.iteritems(): |
| if key != 'Records': |
| print_message(' ' + key + ' = ' + str(value), newlines=0) |
| records = props['Records'] |
| if not records: |
| return |
| print_message('Records: ', newlines=1) |
| for record in records: |
| self.show_record_data(str(record)) |
| print |
| |
| @handle_errors |
| def show_device_data(self, device): |
| """ |
| Prints the properties of the given device, as well as the contents of |
| any records associated with it. |
| |
| @param device: string, containing the device's D-Bus object path. |
| |
| """ |
| device_proxy = self._get_device_proxy(device) |
| if not device_proxy: |
| print_message('Device "' + device + '" not found.') |
| return |
| records = device_proxy.GetProperties()['Records'] |
| if not records: |
| print_message('No records on device.') |
| return |
| print_message('Records: ', newlines=1) |
| for record in records: |
| self.show_record_data(str(record)) |
| print |
| |
| @handle_errors |
| def show_record_data(self, record): |
| """ |
| Prints the contents of the given record. |
| |
| @param record: string, containing the record's D-Bus object path. |
| |
| """ |
| record_proxy = self._get_record_proxy(record) |
| if not record_proxy: |
| print_message('Record "' + record + '" not found.') |
| return |
| props = record_proxy.GetProperties() |
| print_message('Record ' + record + ': ', newlines=1) |
| for key, value in props.iteritems(): |
| print ' ' + key + ' = ' + value |
| print |
| |
| def _create_record_data(self, record_type, params): |
| if record_type == 'Text': |
| possible_keys = [ 'Encoding', 'Language', 'Representation' ] |
| tag_data = { 'Type': 'Text' } |
| elif record_type == 'URI': |
| possible_keys = [ 'URI' ] |
| tag_data = { 'Type': 'URI' } |
| else: |
| print_message('Writing record type "' + record_type + |
| '" currently not supported.') |
| return None |
| for key, value in params.iteritems(): |
| if key in possible_keys: |
| tag_data[key] = value |
| return tag_data |
| |
| @handle_errors |
| def write_tag(self, tag, record_type, params): |
| """ |
| Writes an NDEF record to the given tag. |
| |
| @param tag: string, containing the tag's D-Bus object path. |
| @param record_type: The type of the record, e.g. Text or URI. |
| @param params: dictionary, containing the parameters of the NDEF. |
| |
| """ |
| tag_data = self._create_record_data(record_type, params) |
| if not tag_data: |
| return |
| tag_proxy = self._get_tag_proxy(tag) |
| if not tag_proxy: |
| print_message('Tag "' + tag + '" not found.') |
| return |
| tag_proxy.Write(tag_data) |
| print_message('Tag written!') |
| |
| @handle_errors |
| def push_to_device(self, device, record_type, params): |
| """ |
| Pushes an NDEF record to the given device. |
| |
| @param device: string, containing the device's D-Bus object path. |
| @param record_type: The type of the record, e.g. Text or URI. |
| @param params: dictionary, containing the parameters of the NDEF. |
| |
| """ |
| record_data = self._create_record_data(record_type, params) |
| if not record_data: |
| return |
| device_proxy = self._get_device_proxy(device) |
| if not device_proxy: |
| print_message('Device "' + device + '" not found.') |
| return |
| device_proxy.Push(record_data) |
| print_message('NDEF pushed to device!') |
| |
| |
| class NfcConsole(cmd.Cmd): |
| """ |
| Interactive console to interact with the NFC daemon. |
| |
| """ |
| def __init__(self): |
| cmd.Cmd.__init__(self) |
| self.prompt = PROMPT |
| |
| def begin(self): |
| """ |
| Starts the interactive shell. |
| |
| """ |
| print_message('NFC console! Run "help" for a list of commands.', |
| newlines=1) |
| self._nfc_client = NfcClient() |
| self._nfc_client.begin() |
| self.cmdloop() |
| |
| def can_exit(self): |
| """Override""" |
| return True |
| |
| def do_initialize(self, args): |
| """Handles "initialize".""" |
| if args: |
| print_message('Command "initialize" expects no arguments.') |
| return |
| self._nfc_client.restart() |
| |
| def help_initialize(self): |
| """Prints the help message for "initialize".""" |
| print_message('Initializes the neard D-Bus client. This can be ' |
| 'run many times to restart the client in case of ' |
| 'neard failures or crashes.') |
| |
| def do_adapters(self, args): |
| """Handles "adapters".""" |
| if args: |
| print_message('Command "adapters" expects no arguments.') |
| return |
| self._nfc_client.show_adapters() |
| |
| def help_adapters(self): |
| """Prints the help message for "adapters".""" |
| print_message('Displays the D-Bus object paths of the available ' |
| 'adapter objects.') |
| |
| def do_adapter_status(self, args): |
| """Handles "adapter_status".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage: adapter_status <adapter>') |
| return |
| self._nfc_client.print_adapter_status(NEARD_PATH + args[0]) |
| |
| def help_adapter_status(self): |
| """Prints the help message for "adapter_status".""" |
| print_message('Returns the properties of the given NFC adapter.\n\n' |
| ' Ex: "adapter_status nfc0"') |
| |
| def do_enable_adapter(self, args): |
| """Handles "enable_adapter".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage: enable_adapter <adapter>') |
| return |
| self._nfc_client.set_powered(NEARD_PATH + args[0], True) |
| |
| def help_enable_adapter(self): |
| """Prints the help message for "enable_adapter".""" |
| print_message('Powers up the adapter. Ex: "enable_adapter nfc0"') |
| |
| def do_disable_adapter(self, args): |
| """Handles "disable_adapter".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage: disable_adapter <adapter>') |
| return |
| self._nfc_client.set_powered(NEARD_PATH + args[0], False) |
| |
| def help_disable_adapter(self): |
| """Prints the help message for "disable_adapter".""" |
| print_message('Powers down the adapter. Ex: "disable_adapter nfc0"') |
| |
| def do_start_poll(self, args): |
| """Handles "start_poll".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage: start_poll <adapter>') |
| return |
| self._nfc_client.start_polling(NEARD_PATH + args[0]) |
| |
| def help_start_poll(self): |
| """Prints the help message for "start_poll".""" |
| print_message('Initiates a poll loop.\n\n Ex: "start_poll nfc0"') |
| |
| def do_stop_poll(self, args): |
| """Handles "stop_poll".""" |
| args = args.split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage: stop_poll <adapter>') |
| return |
| self._nfc_client.stop_polling(NEARD_PATH + args[0]) |
| |
| def help_stop_poll(self): |
| """Prints the help message for "stop_poll".""" |
| print_message('Stops a poll loop.\n\n Ex: "stop_poll nfc0"') |
| |
| def do_read_tag(self, args): |
| """Handles "read_tag".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage read_tag <tag>') |
| return |
| self._nfc_client.show_tag_data(NEARD_PATH + args[0]) |
| |
| def help_read_tag(self): |
| """Prints the help message for "read_tag".""" |
| print_message('Reads the contents of a tag. Ex: read_tag nfc0/tag0') |
| |
| def _parse_record_args(self, record_type, args): |
| if record_type == 'Text': |
| if len(args) < 5: |
| print_message('Usage: write_tag <tag> Text <encoding> ' |
| '<language> <representation>') |
| return None |
| if args[2] not in [ 'UTF-8', 'UTF-16' ]: |
| print_message('Encoding must be one of "UTF-8" or "UTF-16".') |
| return None |
| return { |
| 'Encoding': args[2], |
| 'Language': args[3], |
| 'Representation': ' '.join(args[4:]) |
| } |
| if record_type == 'URI': |
| if len(args) != 3: |
| print_message('Usage: write_tag <tag> URI <uri>') |
| return None |
| return { |
| 'URI': args[2] |
| } |
| print_message('Only types "Text" and "URI" are supported by this ' |
| 'script.') |
| return None |
| |
| def do_write_tag(self, args): |
| """Handles "write_tag".""" |
| args = args.strip().split(' ') |
| if len(args) < 3: |
| print_message('Usage: write_tag <tag> [params]') |
| return |
| record_type = args[1] |
| params = self._parse_record_args(record_type, args) |
| if not params: |
| return |
| self._nfc_client.write_tag(NEARD_PATH + args[0], |
| record_type, params) |
| |
| def help_write_tag(self): |
| """Prints the help message for "write_tag".""" |
| print_message('Writes the given data to a tag. Usage:\n' |
| ' write_tag <tag> Text <encoding> <language> ' |
| '<representation>\n write_tag <tag> URI <uri>') |
| |
| def do_read_device(self, args): |
| """Handles "read_device".""" |
| args = args.strip().split(' ') |
| if len(args) != 1 or not args[0]: |
| print_message('Usage read_device <device>') |
| return |
| self._nfc_client.show_device_data(NEARD_PATH + args[0]) |
| |
| def help_read_device(self): |
| """Prints the help message for "read_device".""" |
| print_message('Reads the contents of a device. Ex: read_device ' |
| 'nfc0/device0') |
| |
| def do_push_to_device(self, args): |
| """Handles "push_to_device".""" |
| args = args.strip().split(' ') |
| if len(args) < 3: |
| print_message('Usage: push_to_device <device> [params]') |
| return |
| record_type = args[1] |
| params = self._parse_record_args(record_type, args) |
| if not params: |
| return |
| self._nfc_client.push_to_device(NEARD_PATH + args[0], |
| record_type, params) |
| |
| def help_push_to_device(self): |
| """Prints the help message for "push_to_device".""" |
| print_message('Pushes the given data to a device. Usage:\n' |
| ' push_to_device <device> Text <encoding> <language> ' |
| '<representation>\n push_to_device <device> URI <uri>') |
| |
| def do_exit(self, args): |
| """ |
| Handles the 'exit' command. |
| |
| @param args: Arguments to the command. Unused. |
| |
| """ |
| if args: |
| print_message('Command "exit" expects no arguments.') |
| return |
| resp = raw_input('Are you sure? (yes/no): ') |
| if resp == 'yes': |
| print_message('Goodbye!') |
| self._nfc_client.end() |
| return True |
| if resp != 'no': |
| print_message('Did not understand: ' + resp) |
| return False |
| |
| def help_exit(self): |
| """Handles the 'help exit' command.""" |
| print_message('Exits the console.') |
| |
| do_EOF = do_exit |
| help_EOF = help_exit |
| |
| |
| def main(): |
| """Main function.""" |
| NfcConsole().begin() |
| |
| |
| if __name__ == '__main__': |
| main() |