blob: 3f55c93ddbd08cd4e85352feae2bac5059565f34 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
This module provides bindings for ModemManager1.
"""
import dbus
import dbus.mainloop.glib
from autotest_lib.client.bin import utils
from autotest_lib.client.cros.cellular import mm1_constants
def _is_unknown_dbus_binding_exception(e):
return (isinstance(e, dbus.exceptions.DBusException) and
e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN,
mm1_constants.DBUS_UNKNOWN_METHOD,
mm1_constants.DBUS_UNKNOWN_OBJECT,
mm1_constants.DBUS_UNKNOWN_INTERFACE])
class ModemManager1ProxyError(Exception):
"""Exceptions raised by ModemManager1ProxyError and it's children."""
pass
class ModemManager1Proxy(object):
"""A wrapper around a DBus proxy for ModemManager1."""
# Amount of time to wait between attempts to connect to ModemManager1.
CONNECT_WAIT_INTERVAL_SECONDS = 0.2
@classmethod
def get_proxy(cls, bus=None, timeout_seconds=10):
"""Connect to ModemManager1 over DBus, retrying if necessary.
After connecting to ModemManager1, this method will verify that
ModemManager1 is answering RPCs.
@param bus: D-Bus bus to use, or specify None and this object will
create a mainloop and bus.
@param timeout_seconds: float number of seconds to try connecting
A value <= 0 will cause the method to return immediately,
without trying to connect.
@return a ModemManager1Proxy instance if we connected, or None
otherwise.
@raise ModemManager1ProxyError if it fails to connect to
ModemManager1.
"""
def _connect_to_mm1(bus):
try:
# We create instance of class on which this classmethod was
# called. This way, calling
# SubclassOfModemManager1Proxy.get_proxy() will get a proxy of
# the right type.
return cls(bus=bus)
except dbus.exceptions.DBusException as e:
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
'Error connecting to ModemManager1. DBus error: |%s|',
repr(e))
utils.poll_for_condition(
lambda: _connect_to_mm1(bus) is not None,
exception=ModemManager1ProxyError(
'Timed out connecting to ModemManager1'),
timeout=timeout_seconds,
sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS)
connection = _connect_to_mm1(bus)
# Check to make sure ModemManager1 is responding to DBus requests by
# setting the logging to debug.
connection.manager.SetLogging('DEBUG', timeout=timeout_seconds)
return connection
def __init__(self, bus=None):
if bus is None:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
self._bus = bus
self._manager = dbus.Interface(
self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
mm1_constants.MM1),
mm1_constants.I_MODEM_MANAGER)
@property
def manager(self):
"""@return the DBus ModemManager1 Manager object."""
return self._manager
def get_modem(self):
"""
Return the one and only modem object.
This method distinguishes between no modem and more than one modem.
In the former, this could happen if the modem has not yet surfaced and
is not really considered an error. The caller can wait for the modem
by repeatedly calling this method. In the latter, it is a clear error
condition and an exception will be raised.
Every call to |get_modem| obtains a fresh DBus proxy for the modem. So,
if the modem DBus object has changed between two calls to this method,
the proxy returned will be for the currently exported modem.
@return a ModemProxy object. Return None if no modem is found.
@raise ModemManager1ProxyError unless exactly one modem is found.
"""
try:
object_manager = dbus.Interface(
self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
mm1_constants.MM1),
mm1_constants.I_OBJECT_MANAGER)
modems = object_manager.GetManagedObjects()
except dbus.exceptions.DBusException as e:
raise ModemManager1ProxyError(
'Failed to list the available modems. DBus error: |%s|',
repr(e))
if not modems:
return None
elif len(modems) > 1:
raise ModemManager1ProxyError(
'Expected one modem object, found %d', len(modems))
modem_proxy = ModemProxy(self._bus, modems.keys()[0])
# Check that this object is valid
try:
modem_proxy.modem.GetAll(mm1_constants.I_MODEM,
dbus_interface=mm1_constants.I_PROPERTIES)
return modem_proxy
except dbus.exceptions.DBusException as e:
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
'Failed to obtain dbus object for the modem. DBus error: '
'|%s|', repr(e))
def wait_for_modem(self, timeout_seconds):
"""
Wait for the modem to appear.
@param timeout_seconds: Number of seconds to wait for modem to appear.
@return a ModemProxy object.
@raise ModemManager1ProxyError if no modem is found within the timeout
or if more than one modem is found. NOTE: This method does not
wait for a second modem. The exception is raised if there is
more than one modem at the time of polling.
"""
return utils.poll_for_condition(
self.get_modem,
exception=ModemManager1ProxyError('No modem found'),
timeout=timeout_seconds)
class ModemProxy(object):
"""A wrapper around a DBus proxy for ModemManager1 modem object."""
# Amount of time to wait for a state transition.
STATE_TRANSITION_WAIT_SECONDS = 10
def __init__(self, bus, path):
self._bus = bus
self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
@property
def modem(self):
"""@return the DBus modem object."""
return self._modem
@property
def iface_modem(self):
"""@return org.freedesktop.ModemManager1.Modem DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM)
@property
def iface_simple_modem(self):
"""@return org.freedesktop.ModemManager1.Simple DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE)
@property
def iface_gsm_modem(self):
"""@return org.freedesktop.ModemManager1.Modem3gpp DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP)
@property
def iface_cdma_modem(self):
"""@return org.freedesktop.ModemManager1.ModemCdma DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA)
@property
def iface_properties(self):
"""@return org.freedesktop.DBus.Properties DBus interface."""
return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE)
def properties(self, iface):
"""Return the properties associated with the specified interface.
@param iface: Name of interface to retrieve the properties from.
@return array of properties.
"""
return self.iface_properties.GetAll(iface)
def get_sim(self):
"""
Return the SIM proxy object associated with this modem.
@return SimProxy object or None if no SIM exists.
"""
sim_path = self.properties(mm1_constants.I_MODEM).get('Sim')
if not sim_path:
return None
sim_proxy = SimProxy(self._bus, sim_path)
# Check that this object is valid
try:
sim_proxy.properties(mm1_constants.I_SIM)
return sim_proxy
except dbus.exceptions.DBusException as e:
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
'Failed to obtain dbus object for the SIM. DBus error: '
'|%s|', repr(e))
def wait_for_states(self, states,
timeout_seconds=STATE_TRANSITION_WAIT_SECONDS):
"""
Wait for the modem to transition to a state in |states|.
This method does not support transitory states (eg. enabling,
disabling, connecting, disconnecting, etc).
@param states: List of states the modem can transition to.
@param timeout_seconds: Max number of seconds to wait.
@raise ModemManager1ProxyError if the modem does not transition to
one of the accepted states.
"""
for state in states:
if state in [mm1_constants.MM_MODEM_STATE_INITIALIZING,
mm1_constants.MM_MODEM_STATE_DISABLING,
mm1_constants.MM_MODEM_STATE_ENABLING,
mm1_constants.MM_MODEM_STATE_SEARCHING,
mm1_constants.MM_MODEM_STATE_DISCONNECTING,
mm1_constants.MM_MODEM_STATE_CONNECTING]:
raise ModemManager1ProxyError(
'wait_for_states() does not support transitory states.')
utils.poll_for_condition(
lambda: self.properties(mm1_constants.I_MODEM)[
mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states,
exception=ModemManager1ProxyError(
'Timed out waiting for modem to enter one of these '
'states: %s, current state=%s',
states,
self.properties(mm1_constants.I_MODEM)[
mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]),
timeout=timeout_seconds)
class SimProxy(object):
"""A wrapper around a DBus proxy for ModemManager1 SIM object."""
def __init__(self, bus, path):
self._bus = bus
self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
@property
def sim(self):
"""@return the DBus SIM object."""
return self._sim
@property
def iface_properties(self):
"""@return org.freedesktop.DBus.Properties DBus interface."""
return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE)
@property
def iface_sim(self):
"""@return org.freedesktop.ModemManager1.Sim DBus interface."""
return dbus.Interface(self._sim, mm1_constants.I_SIM)
def properties(self, iface=mm1_constants.I_SIM):
"""Return the properties associated with the specified interface.
@param iface: Name of interface to retrieve the properties from.
@return array of properties.
"""
return self.iface_properties.GetAll(iface)