| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # Note that this module is Python 2/3 compatible because factory and autotest |
| # Python 2 code uses it in addition to the scripts in this directory. |
| |
| # DEPRECATED |
| # Do not use flimflam.py in future development. |
| # Extend / migrate to shill_proxy suite of scripts instead. |
| |
| from __future__ import print_function |
| |
| import logging |
| import sys |
| import time |
| |
| import dbus |
| |
| # dbus-python only allows the use of the 'utf8_strings' kwarg in Py2: |
| # https://dbus.freedesktop.org/doc/dbus-python/PY3PORT.html#user-visible-changes |
| if sys.version_info.major < 3: |
| utf8_kwargs = {'utf8_strings': True} |
| else: |
| utf8_kwargs = {} |
| |
| DEFAULT_CELLULAR_TIMEOUT = 60 |
| |
| |
| def make_dbus_boolean(value): |
| value = value.upper() |
| if value in ['ON', 'TRUE']: |
| return dbus.Boolean(1) |
| elif value in ['OFF', 'FALSE']: |
| return dbus.Boolean(0) |
| else: |
| return dbus.Boolean(int(value)) |
| |
| # |
| # Convert a DBus value to a printable value; used |
| # to print properties returned via DBus |
| # |
| |
| |
| def convert_dbus_value(value, indent=0): |
| # DEPRECATED |
| spacer = ' ' * indent |
| if value.__class__ == dbus.Byte: |
| return int(value) |
| elif value.__class__ == dbus.Boolean: |
| return bool(value) |
| elif value.__class__ == dbus.Dictionary: |
| valstr = '{' |
| for key in value: |
| valstr += '\n' + spacer + ' ' + \ |
| key + ': ' + str(convert_dbus_value(value[key], indent + 4)) |
| valstr += '\n' + spacer + '}' |
| return valstr |
| elif value.__class__ == dbus.Array: |
| valstr = '[' |
| for val in value: |
| valstr += '\n' + spacer + ' ' + \ |
| str(convert_dbus_value(val, indent + 4)) |
| valstr += '\n' + spacer + ']' |
| return valstr |
| else: |
| return str(value) |
| |
| |
| class FlimFlam(object): |
| # DEPRECATED |
| |
| SHILL_DBUS_INTERFACE = 'org.chromium.flimflam' |
| UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod' |
| UNKNOWN_OBJECT = 'org.freedesktop.DBus.Error.UnknownObject' |
| |
| DEVICE_CELLULAR = 'cellular' |
| |
| @staticmethod |
| def _GetContainerName(kind): |
| """Map shill element names to the names of their collections.""" |
| # For example, Device - > Devices. |
| # Just pulling this out so we can use a map if we start |
| # caring about "AvailableTechnologies" |
| return kind + 's' |
| |
| @staticmethod |
| def WaitForServiceState(service, expected_states, timeout, |
| ignore_failure=False, property_name='State'): |
| """Wait until service enters a state in expected_states or times out. |
| Args: |
| service: service to watch |
| expected_states: list of exit states |
| timeout: in seconds |
| ignore_failure: should the failure state be ignored? |
| property_name: name of service property |
| |
| Returns: (state, seconds waited) |
| |
| If the state is "failure" and ignore_failure is False we return |
| immediately without waiting for the timeout. |
| """ |
| |
| state = None |
| start_time = time.time() |
| timeout = start_time + timeout |
| while time.time() < timeout: |
| properties = service.GetProperties(**utf8_kwargs) |
| state = properties.get(property_name, None) |
| if ((state == 'failure' and not ignore_failure) or |
| state in expected_states): |
| break |
| time.sleep(.5) |
| |
| config_time = time.time() - start_time |
| # str() to remove DBus boxing |
| return (str(state), config_time) |
| |
| @staticmethod |
| def DisconnectService(service, wait_timeout=15): |
| try: |
| service.Disconnect() |
| except dbus.exceptions.DBusException as error: |
| if error.get_dbus_name() not in [ |
| FlimFlam.SHILL_DBUS_INTERFACE + '.Error.InProgress', |
| FlimFlam.SHILL_DBUS_INTERFACE + '.Error.NotConnected', ]: |
| raise error |
| return FlimFlam.WaitForServiceState(service, ['idle'], wait_timeout) |
| |
| def __init__(self, bus=None): |
| if not bus: |
| bus = dbus.SystemBus() |
| self.bus = bus |
| shill = bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, '/') |
| self.manager = dbus.Interface( |
| shill, |
| FlimFlam.SHILL_DBUS_INTERFACE + '.Manager') |
| |
| def GetManager(self): |
| return self.manager |
| |
| def _FindDevice(self, device_type, timeout): |
| """ Return the first device object that matches a given device type. |
| |
| Wait until the device type is avilable or until timeout |
| |
| Args: |
| device_type: string format of the type of device. |
| timeout: in seconds |
| |
| Returns: Device or None |
| """ |
| timeout = time.time() + timeout |
| device_obj = None |
| while time.time() < timeout: |
| device_obj = self.FindElementByPropertySubstring('Device', |
| 'Type', |
| device_type) |
| if device_obj: |
| break |
| time.sleep(1) |
| return device_obj |
| |
| def FindCellularDevice(self, timeout=DEFAULT_CELLULAR_TIMEOUT): |
| return self._FindDevice(self.DEVICE_CELLULAR, timeout) |
| |
| def _FindService(self, device_type, timeout): |
| """Return the first service object that matches the device type. |
| |
| Wait until a service is available or until the timeout. |
| |
| Args: |
| device_type: string format of the type of device. |
| timeout: in seconds |
| |
| Returns: service or None |
| """ |
| start_time = time.time() |
| timeout = start_time + timeout |
| service = None |
| while time.time() < timeout: |
| service = self.FindElementByPropertySubstring('Service', |
| 'Type', device_type) |
| if service: |
| break |
| time.sleep(.5) |
| return service |
| |
| def FindCellularService(self, timeout=DEFAULT_CELLULAR_TIMEOUT): |
| return self._FindService(self.DEVICE_CELLULAR, timeout) |
| |
| def GetService(self, params): |
| path = self.manager.GetService(params) |
| return self.GetObjectInterface('Service', path) |
| |
| def GetNetworksForGeolocation(self): |
| return self.manager.GetNetworksForGeolocation() |
| |
| def ConnectService(self, assoc_timeout=15, config_timeout=15, |
| async=False, service=None, service_type='', |
| retry=False, retries=1, retry_sleep=15, |
| save_creds=False, |
| **kwargs): |
| """Connect to a service and wait until connection is up |
| Args: |
| assoc_timeout, config_timeout: Timeouts in seconds. |
| async: return immediately. do not wait for connection. |
| service: DBus service |
| service_type: If supplied, invoke type-specific code to find service. |
| retry: Retry connection after Connect failure. |
| retries: Number of retries to allow. |
| retry_sleep: Number of seconds to wait before retrying. |
| kwargs: Additional args for type-specific code |
| |
| Returns: |
| (success, dict), where dict contains stats and diagnostics. |
| """ |
| output = {} |
| connected_states = ['ready', 'portal', 'online'] |
| |
| # Retry connections on failure. Need to call GetService again as some |
| # Connect failure states are unrecoverable. |
| connect_success = False |
| while not connect_success: |
| if service_type == 'wifi': |
| try: |
| # Coherence check to make sure the caller hasn't provided |
| # both a service and a service type. At which point its |
| # unclear what they actually want to do, so err on the |
| # side of caution and except out. |
| if service: |
| raise Exception('supplied service and service type') |
| params = { |
| 'Type': service_type, |
| 'Mode': kwargs['mode'], |
| 'SSID': kwargs['ssid'], |
| 'Security': kwargs.get('security', 'none'), |
| 'SaveCredentials': save_creds} |
| # Supply a passphrase only if it is non-empty. |
| passphrase = kwargs.get('passphrase', '') |
| if passphrase: |
| params['Passphrase'] = passphrase |
| path = self.manager.GetService(params) |
| service = self.GetObjectInterface('Service', path) |
| except Exception as e: |
| output['reason'] = 'FAIL(GetService): exception %s' % e |
| return (False, output) |
| |
| output['service'] = service |
| |
| try: |
| service.Connect() |
| connect_success = True |
| except Exception as e: |
| if not retry or retries == 0: |
| output['reason'] = 'FAIL(Connect): exception %s' % e |
| return (False, output) |
| else: |
| logging.info('INFO(Connect): connect failed. Retrying...') |
| retries -= 1 |
| |
| if not connect_success: |
| # FlimFlam can be a little funny sometimes. At least for In |
| # Progress errors, even though the service state may be failed, |
| # it is actually still trying to connect. As such, while we're |
| # waiting for retry, keep checking the service state to see if |
| # it actually succeeded in connecting. |
| state = FlimFlam.WaitForServiceState( |
| service=service, |
| expected_states=connected_states, |
| timeout=retry_sleep, |
| ignore_failure=True)[0] |
| |
| if state in connected_states: |
| return (True, output) |
| |
| # While service can be caller provided, it is also set by the |
| # GetService call above. If service was not caller provided we |
| # need to reset it to None so we don't fail the coherence check |
| # above. |
| if service_type != '': |
| service = None |
| |
| if async: |
| return (True, output) |
| |
| logging.info('Associating...') |
| (state, assoc_time) = ( |
| FlimFlam.WaitForServiceState(service, |
| ['configuration'] + connected_states, |
| assoc_timeout)) |
| output['state'] = state |
| if state == 'failure': |
| output['reason'] = 'FAIL(assoc)' |
| if assoc_time > assoc_timeout: |
| output['reason'] = 'TIMEOUT(assoc)' |
| output['assoc_time'] = assoc_time |
| if 'reason' in output: |
| return (False, output) |
| |
| (state, config_time) = ( |
| FlimFlam.WaitForServiceState(service, |
| connected_states, config_timeout)) |
| output['state'] = state |
| if state == 'failure': |
| output['reason'] = 'FAIL(config)' |
| if config_time > config_timeout: |
| output['reason'] = 'TIMEOUT(config)' |
| output['config_time'] = config_time |
| |
| if 'reason' in output: |
| return (False, output) |
| |
| return (True, output) |
| |
| def GetObjectInterface(self, kind, path): |
| return dbus.Interface( |
| self.bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, path), |
| FlimFlam.SHILL_DBUS_INTERFACE + '.' + kind) |
| |
| def FindElementByNameSubstring(self, kind, substring): |
| properties = self.manager.GetProperties(**utf8_kwargs) |
| for path in properties[FlimFlam._GetContainerName(kind)]: |
| if path.find(substring) >= 0: |
| return self.GetObjectInterface(kind, path) |
| return None |
| |
| def FindElementByProperty(self, kind, prop, val): |
| properties = self.manager.GetProperties(**utf8_kwargs) |
| for path in properties[FlimFlam._GetContainerName(kind)]: |
| obj = self.GetObjectInterface(kind, path) |
| try: |
| obj_properties = obj.GetProperties(**utf8_kwargs) |
| except dbus.exceptions.DBusException as error: |
| if (error.get_dbus_name() == self.UNKNOWN_METHOD or |
| error.get_dbus_name() == self.UNKNOWN_OBJECT): |
| # object disappeared; ignore and keep looking |
| continue |
| else: |
| raise error |
| if prop in obj_properties and obj_properties[prop] == val: |
| return obj |
| return None |
| |
| def FindElementByPropertySubstring(self, kind, prop, substring): |
| properties = self.manager.GetProperties(**utf8_kwargs) |
| for path in properties[FlimFlam._GetContainerName(kind)]: |
| obj = self.GetObjectInterface(kind, path) |
| try: |
| obj_properties = obj.GetProperties(**utf8_kwargs) |
| except dbus.exceptions.DBusException as error: |
| if (error.get_dbus_name() == self.UNKNOWN_METHOD or |
| error.get_dbus_name() == self.UNKNOWN_OBJECT): |
| # object disappeared; ignore and keep looking |
| continue |
| else: |
| raise error |
| if (prop in obj_properties and |
| obj_properties[prop].find(substring) >= 0): |
| return obj |
| return None |
| |
| def GetObjectList(self, kind, properties=None): |
| if properties is None: |
| properties = self.manager.GetProperties(**utf8_kwargs) |
| return [self.GetObjectInterface(kind, path) |
| for path in properties[FlimFlam._GetContainerName(kind)]] |
| |
| def CreateProfile(self, ident): |
| path = self.manager.CreateProfile(ident) |
| return self.GetObjectInterface('Profile', path) |
| |
| def RemoveProfile(self, ident): |
| self.manager.RemoveProfile(ident) |
| |
| def PushProfile(self, ident): |
| path = self.manager.PushProfile(ident) |
| return self.GetObjectInterface('Profile', path) |
| |
| def PopProfile(self, ident): |
| self.manager.PopProfile(ident) |
| |
| def PopAnyProfile(self): |
| self.manager.PopAnyProfile() |
| |
| def GetSystemState(self): |
| properties = self.manager.GetProperties(**utf8_kwargs) |
| return properties['State'] |
| |
| def GetDebugTags(self): |
| return self.manager.GetDebugTags() |
| |
| def ListDebugTags(self): |
| return self.manager.ListDebugTags() |
| |
| def SetDebugTags(self, taglist): |
| try: |
| self.manager.SetDebugTags(taglist) |
| self.SetDebugLevel(-4) |
| except dbus.exceptions.DBusException as error: |
| if error.get_dbus_name() not in [ |
| 'org.freedesktop.DBus.Error.UnknownMethod']: |
| raise error |
| |
| def SetDebugLevel(self, level): |
| self.manager.SetDebugLevel(level) |
| |
| def GetServiceOrder(self): |
| return self.manager.GetServiceOrder() |
| |
| def SetServiceOrder(self, new_order): |
| old_order = self.GetServiceOrder() |
| self.manager.SetServiceOrder(new_order) |
| return (old_order, new_order) |
| |
| def EnableTechnology(self, tech): |
| try: |
| self.manager.EnableTechnology(tech) |
| except dbus.exceptions.DBusException as error: |
| if error.get_dbus_name() not in [ |
| FlimFlam.SHILL_DBUS_INTERFACE + '.Error.AlreadyEnabled', |
| FlimFlam.SHILL_DBUS_INTERFACE + '.Error.InProgress']: |
| raise error |
| |
| def DisableTechnology(self, tech): |
| self.manager.DisableTechnology(tech, timeout=60) |
| |
| def RequestScan(self, technology): |
| self.manager.RequestScan(technology) |
| |
| def SetCheckPortalList(self, tech_list): |
| self.manager.SetProperty('CheckPortalList', tech_list) |
| |
| def SetArpGateway(self, do_arp_gateway): |
| self.manager.SetProperty('ArpGateway', do_arp_gateway) |
| |
| |
| class DeviceManager(object): |
| # DEPRECATED |
| """Use flimflam to isolate a given interface for testing. |
| |
| DeviceManager can be used to turn off network devices that are not |
| under test so that they will not interfere with testing. |
| |
| NB: Ethernet devices are special inside Flimflam. You will need to |
| take care of them via other means (like, for example, the |
| backchannel ethernet code in client autotests) |
| |
| Sample usage: |
| |
| device_manager = flimflam.DeviceManager() |
| try: |
| device_manager.ShutdownAllExcept('cellular') |
| use routing.getRouteFor() |
| to verify that only the expected device is used |
| do stuff to test cellular connections |
| finally: |
| device_manager.RestoreDevices() |
| """ |
| |
| @staticmethod |
| def _EnableDevice(device, enable): |
| """Enables/Disables a device in shill.""" |
| if enable: |
| device.Enable() |
| else: |
| device.Disable() |
| |
| def __init__(self, flim=None): |
| self.flim_ = flim or FlimFlam() |
| self.devices_to_restore_ = [] |
| |
| def ShutdownAllExcept(self, device_type): |
| """Shutdown all devices except device_type ones.""" |
| for device in self.flim_.GetObjectList('Device'): |
| device_properties = device.GetProperties(**utf8_kwargs) |
| if device_properties['Type'] != device_type: |
| logging.info('Powering off %s device %s', |
| device_properties['Type'], |
| device.object_path) |
| self.devices_to_restore_.append(device.object_path) |
| DeviceManager._EnableDevice(device, False) |
| |
| def RestoreDevices(self): |
| """Restore devices powered down in ShutdownAllExcept.""" |
| should_raise = False |
| to_raise = Exception('Nothing to raise') |
| for device_path in self.devices_to_restore_: |
| try: |
| logging.info('Attempting to power on device %s', device_path) |
| device = self.flim_.GetObjectInterface('Device', device_path) |
| DeviceManager._EnableDevice(device, True) |
| except Exception as e: |
| # We want to keep on trying to power things on, so save an |
| # exception and continue |
| should_raise = True |
| to_raise = e |
| if should_raise: |
| raise to_raise |