blob: 3fd276cd7522da8139aac748c542be64b8640fc3 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Note that this module is Python 2/3 compatible because factory and autotest
# Python 2 code uses it in addition to the scripts in this directory.
# DEPRECATED
# Do not use flimflam.py in future development.
# Extend / migrate to shill_proxy suite of scripts instead.
from __future__ import print_function
import logging
import sys
import time
import dbus
# dbus-python only allows the use of the 'utf8_strings' kwarg in Py2:
# https://dbus.freedesktop.org/doc/dbus-python/PY3PORT.html#user-visible-changes
if sys.version_info.major < 3:
utf8_kwargs = {'utf8_strings': True}
else:
utf8_kwargs = {}
DEFAULT_CELLULAR_TIMEOUT = 60
def make_dbus_boolean(value):
value = value.upper()
if value in ['ON', 'TRUE']:
return dbus.Boolean(1)
elif value in ['OFF', 'FALSE']:
return dbus.Boolean(0)
else:
return dbus.Boolean(int(value))
#
# Convert a DBus value to a printable value; used
# to print properties returned via DBus
#
def convert_dbus_value(value, indent=0):
# DEPRECATED
spacer = ' ' * indent
if value.__class__ == dbus.Byte:
return int(value)
elif value.__class__ == dbus.Boolean:
return bool(value)
elif value.__class__ == dbus.Dictionary:
valstr = '{'
for key in value:
valstr += '\n' + spacer + ' ' + \
key + ': ' + str(convert_dbus_value(value[key], indent + 4))
valstr += '\n' + spacer + '}'
return valstr
elif value.__class__ == dbus.Array:
valstr = '['
for val in value:
valstr += '\n' + spacer + ' ' + \
str(convert_dbus_value(val, indent + 4))
valstr += '\n' + spacer + ']'
return valstr
else:
return str(value)
class FlimFlam(object):
# DEPRECATED
SHILL_DBUS_INTERFACE = 'org.chromium.flimflam'
UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod'
UNKNOWN_OBJECT = 'org.freedesktop.DBus.Error.UnknownObject'
DEVICE_CELLULAR = 'cellular'
@staticmethod
def _GetContainerName(kind):
"""Map shill element names to the names of their collections."""
# For example, Device - > Devices.
# Just pulling this out so we can use a map if we start
# caring about "AvailableTechnologies"
return kind + 's'
@staticmethod
def WaitForServiceState(service, expected_states, timeout,
ignore_failure=False, property_name='State'):
"""Wait until service enters a state in expected_states or times out.
Args:
service: service to watch
expected_states: list of exit states
timeout: in seconds
ignore_failure: should the failure state be ignored?
property_name: name of service property
Returns: (state, seconds waited)
If the state is "failure" and ignore_failure is False we return
immediately without waiting for the timeout.
"""
state = None
start_time = time.time()
timeout = start_time + timeout
while time.time() < timeout:
properties = service.GetProperties(**utf8_kwargs)
state = properties.get(property_name, None)
if ((state == 'failure' and not ignore_failure) or
state in expected_states):
break
time.sleep(.5)
config_time = time.time() - start_time
# str() to remove DBus boxing
return (str(state), config_time)
@staticmethod
def DisconnectService(service, wait_timeout=15):
try:
service.Disconnect()
except dbus.exceptions.DBusException as error:
if error.get_dbus_name() not in [
FlimFlam.SHILL_DBUS_INTERFACE + '.Error.InProgress',
FlimFlam.SHILL_DBUS_INTERFACE + '.Error.NotConnected', ]:
raise error
return FlimFlam.WaitForServiceState(service, ['idle'], wait_timeout)
def __init__(self, bus=None):
if not bus:
bus = dbus.SystemBus()
self.bus = bus
shill = bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, '/')
self.manager = dbus.Interface(
shill,
FlimFlam.SHILL_DBUS_INTERFACE + '.Manager')
def GetManager(self):
return self.manager
def _FindDevice(self, device_type, timeout):
""" Return the first device object that matches a given device type.
Wait until the device type is avilable or until timeout
Args:
device_type: string format of the type of device.
timeout: in seconds
Returns: Device or None
"""
timeout = time.time() + timeout
device_obj = None
while time.time() < timeout:
device_obj = self.FindElementByPropertySubstring('Device',
'Type',
device_type)
if device_obj:
break
time.sleep(1)
return device_obj
def FindCellularDevice(self, timeout=DEFAULT_CELLULAR_TIMEOUT):
return self._FindDevice(self.DEVICE_CELLULAR, timeout)
def _FindService(self, device_type, timeout):
"""Return the first service object that matches the device type.
Wait until a service is available or until the timeout.
Args:
device_type: string format of the type of device.
timeout: in seconds
Returns: service or None
"""
start_time = time.time()
timeout = start_time + timeout
service = None
while time.time() < timeout:
service = self.FindElementByPropertySubstring('Service',
'Type', device_type)
if service:
break
time.sleep(.5)
return service
def FindCellularService(self, timeout=DEFAULT_CELLULAR_TIMEOUT):
return self._FindService(self.DEVICE_CELLULAR, timeout)
def GetService(self, params):
path = self.manager.GetService(params)
return self.GetObjectInterface('Service', path)
def GetNetworksForGeolocation(self):
return self.manager.GetNetworksForGeolocation()
def ConnectService(self, assoc_timeout=15, config_timeout=15,
async=False, service=None, service_type='',
retry=False, retries=1, retry_sleep=15,
save_creds=False,
**kwargs):
"""Connect to a service and wait until connection is up
Args:
assoc_timeout, config_timeout: Timeouts in seconds.
async: return immediately. do not wait for connection.
service: DBus service
service_type: If supplied, invoke type-specific code to find service.
retry: Retry connection after Connect failure.
retries: Number of retries to allow.
retry_sleep: Number of seconds to wait before retrying.
kwargs: Additional args for type-specific code
Returns:
(success, dict), where dict contains stats and diagnostics.
"""
output = {}
connected_states = ['ready', 'portal', 'online']
# Retry connections on failure. Need to call GetService again as some
# Connect failure states are unrecoverable.
connect_success = False
while not connect_success:
if service_type == 'wifi':
try:
# Coherence check to make sure the caller hasn't provided
# both a service and a service type. At which point its
# unclear what they actually want to do, so err on the
# side of caution and except out.
if service:
raise Exception('supplied service and service type')
params = {
'Type': service_type,
'Mode': kwargs['mode'],
'SSID': kwargs['ssid'],
'Security': kwargs.get('security', 'none'),
'SaveCredentials': save_creds}
# Supply a passphrase only if it is non-empty.
passphrase = kwargs.get('passphrase', '')
if passphrase:
params['Passphrase'] = passphrase
path = self.manager.GetService(params)
service = self.GetObjectInterface('Service', path)
except Exception as e:
output['reason'] = 'FAIL(GetService): exception %s' % e
return (False, output)
output['service'] = service
try:
service.Connect()
connect_success = True
except Exception as e:
if not retry or retries == 0:
output['reason'] = 'FAIL(Connect): exception %s' % e
return (False, output)
else:
logging.info('INFO(Connect): connect failed. Retrying...')
retries -= 1
if not connect_success:
# FlimFlam can be a little funny sometimes. At least for In
# Progress errors, even though the service state may be failed,
# it is actually still trying to connect. As such, while we're
# waiting for retry, keep checking the service state to see if
# it actually succeeded in connecting.
state = FlimFlam.WaitForServiceState(
service=service,
expected_states=connected_states,
timeout=retry_sleep,
ignore_failure=True)[0]
if state in connected_states:
return (True, output)
# While service can be caller provided, it is also set by the
# GetService call above. If service was not caller provided we
# need to reset it to None so we don't fail the coherence check
# above.
if service_type != '':
service = None
if async:
return (True, output)
logging.info('Associating...')
(state, assoc_time) = (
FlimFlam.WaitForServiceState(service,
['configuration'] + connected_states,
assoc_timeout))
output['state'] = state
if state == 'failure':
output['reason'] = 'FAIL(assoc)'
if assoc_time > assoc_timeout:
output['reason'] = 'TIMEOUT(assoc)'
output['assoc_time'] = assoc_time
if 'reason' in output:
return (False, output)
(state, config_time) = (
FlimFlam.WaitForServiceState(service,
connected_states, config_timeout))
output['state'] = state
if state == 'failure':
output['reason'] = 'FAIL(config)'
if config_time > config_timeout:
output['reason'] = 'TIMEOUT(config)'
output['config_time'] = config_time
if 'reason' in output:
return (False, output)
return (True, output)
def GetObjectInterface(self, kind, path):
return dbus.Interface(
self.bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, path),
FlimFlam.SHILL_DBUS_INTERFACE + '.' + kind)
def FindElementByNameSubstring(self, kind, substring):
properties = self.manager.GetProperties(**utf8_kwargs)
for path in properties[FlimFlam._GetContainerName(kind)]:
if path.find(substring) >= 0:
return self.GetObjectInterface(kind, path)
return None
def FindElementByProperty(self, kind, prop, val):
properties = self.manager.GetProperties(**utf8_kwargs)
for path in properties[FlimFlam._GetContainerName(kind)]:
obj = self.GetObjectInterface(kind, path)
try:
obj_properties = obj.GetProperties(**utf8_kwargs)
except dbus.exceptions.DBusException as error:
if (error.get_dbus_name() == self.UNKNOWN_METHOD or
error.get_dbus_name() == self.UNKNOWN_OBJECT):
# object disappeared; ignore and keep looking
continue
else:
raise error
if prop in obj_properties and obj_properties[prop] == val:
return obj
return None
def FindElementByPropertySubstring(self, kind, prop, substring):
properties = self.manager.GetProperties(**utf8_kwargs)
for path in properties[FlimFlam._GetContainerName(kind)]:
obj = self.GetObjectInterface(kind, path)
try:
obj_properties = obj.GetProperties(**utf8_kwargs)
except dbus.exceptions.DBusException as error:
if (error.get_dbus_name() == self.UNKNOWN_METHOD or
error.get_dbus_name() == self.UNKNOWN_OBJECT):
# object disappeared; ignore and keep looking
continue
else:
raise error
if (prop in obj_properties and
obj_properties[prop].find(substring) >= 0):
return obj
return None
def GetObjectList(self, kind, properties=None):
if properties is None:
properties = self.manager.GetProperties(**utf8_kwargs)
return [self.GetObjectInterface(kind, path)
for path in properties[FlimFlam._GetContainerName(kind)]]
def CreateProfile(self, ident):
path = self.manager.CreateProfile(ident)
return self.GetObjectInterface('Profile', path)
def RemoveProfile(self, ident):
self.manager.RemoveProfile(ident)
def PushProfile(self, ident):
path = self.manager.PushProfile(ident)
return self.GetObjectInterface('Profile', path)
def PopProfile(self, ident):
self.manager.PopProfile(ident)
def PopAnyProfile(self):
self.manager.PopAnyProfile()
def GetSystemState(self):
properties = self.manager.GetProperties(**utf8_kwargs)
return properties['State']
def GetDebugTags(self):
return self.manager.GetDebugTags()
def ListDebugTags(self):
return self.manager.ListDebugTags()
def SetDebugTags(self, taglist):
try:
self.manager.SetDebugTags(taglist)
self.SetDebugLevel(-4)
except dbus.exceptions.DBusException as error:
if error.get_dbus_name() not in [
'org.freedesktop.DBus.Error.UnknownMethod']:
raise error
def SetDebugLevel(self, level):
self.manager.SetDebugLevel(level)
def GetServiceOrder(self):
return self.manager.GetServiceOrder()
def SetServiceOrder(self, new_order):
old_order = self.GetServiceOrder()
self.manager.SetServiceOrder(new_order)
return (old_order, new_order)
def EnableTechnology(self, tech):
try:
self.manager.EnableTechnology(tech)
except dbus.exceptions.DBusException as error:
if error.get_dbus_name() not in [
FlimFlam.SHILL_DBUS_INTERFACE + '.Error.AlreadyEnabled',
FlimFlam.SHILL_DBUS_INTERFACE + '.Error.InProgress']:
raise error
def DisableTechnology(self, tech):
self.manager.DisableTechnology(tech, timeout=60)
def RequestScan(self, technology):
self.manager.RequestScan(technology)
def SetCheckPortalList(self, tech_list):
self.manager.SetProperty('CheckPortalList', tech_list)
def SetArpGateway(self, do_arp_gateway):
self.manager.SetProperty('ArpGateway', do_arp_gateway)
class DeviceManager(object):
# DEPRECATED
"""Use flimflam to isolate a given interface for testing.
DeviceManager can be used to turn off network devices that are not
under test so that they will not interfere with testing.
NB: Ethernet devices are special inside Flimflam. You will need to
take care of them via other means (like, for example, the
backchannel ethernet code in client autotests)
Sample usage:
device_manager = flimflam.DeviceManager()
try:
device_manager.ShutdownAllExcept('cellular')
use routing.getRouteFor()
to verify that only the expected device is used
do stuff to test cellular connections
finally:
device_manager.RestoreDevices()
"""
@staticmethod
def _EnableDevice(device, enable):
"""Enables/Disables a device in shill."""
if enable:
device.Enable()
else:
device.Disable()
def __init__(self, flim=None):
self.flim_ = flim or FlimFlam()
self.devices_to_restore_ = []
def ShutdownAllExcept(self, device_type):
"""Shutdown all devices except device_type ones."""
for device in self.flim_.GetObjectList('Device'):
device_properties = device.GetProperties(**utf8_kwargs)
if device_properties['Type'] != device_type:
logging.info('Powering off %s device %s',
device_properties['Type'],
device.object_path)
self.devices_to_restore_.append(device.object_path)
DeviceManager._EnableDevice(device, False)
def RestoreDevices(self):
"""Restore devices powered down in ShutdownAllExcept."""
should_raise = False
to_raise = Exception('Nothing to raise')
for device_path in self.devices_to_restore_:
try:
logging.info('Attempting to power on device %s', device_path)
device = self.flim_.GetObjectInterface('Device', device_path)
DeviceManager._EnableDevice(device, True)
except Exception as e:
# We want to keep on trying to power things on, so save an
# exception and continue
should_raise = True
to_raise = e
if should_raise:
raise to_raise