| #!/usr/bin/env python |
| |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import dbus.mainloop.glib |
| import gobject |
| import logging |
| import optparse |
| import os |
| import signal |
| import subprocess |
| import sys |
| import time |
| |
| import mm1 |
| import modemmanager |
| import modem_3gpp |
| import sim |
| |
| import common |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import virtual_ethernet_pair |
| |
| |
| IFACE_NAME = 'pseudomodem0' |
| PEER_IFACE_NAME = IFACE_NAME + 'p' |
| IFACE_IP_BASE = '192.168.7' |
| DEFAULT_CARRIER = 'banana' |
| # TODO(armansito): Remove 'cromo' once it gets deprecated. |
| DEFAULT_MANAGERS = ['cromo', 'modemmanager'] |
| PARENT_SLEEP_TIMEOUT = 2 |
| |
| class TestModemManagerContext(object): |
| |
| def __init__(self, use_pseudomodem, |
| real_managers=DEFAULT_MANAGERS, |
| sim=None, |
| modem=None): |
| """ |
| Args: |
| use_pseudomodem -- Whether or not the context should create a |
| pseudo modem manager. |
| |
| real_managers -- Array containing the names of real modem manager |
| daemons that need to be stopped before starting |
| the pseudo modem manager, |
| e.g. ['cromo', 'modemmanager'] |
| |
| sim -- An instance of sim.SIM. This is required for 3GPP modems |
| as it encapsulates information about the carrier. |
| |
| modem -- An instance of a modem.Modem subclass. If none is provided |
| the default modem is an instance of modem_3gpp.Modem3GPP. |
| |
| """ |
| self.use_pseudomodem = use_pseudomodem |
| self.real_managers = real_managers |
| self.pseudo_modem = modem |
| self.sim = sim |
| self.pseudo_modem_manager = None |
| |
| def __enter__(self): |
| if self.use_pseudomodem: |
| for modem_manager in self.real_managers: |
| try: |
| utils.run('/sbin/stop %s' % modem_manager) |
| except error.CmdError: |
| pass |
| self.pseudo_modem_manager = \ |
| PseudoModemManager(modem=self.pseudo_modem, sim=self.sim) |
| self.pseudo_modem_manager.Start() |
| return self |
| |
| def __exit__(self, *args): |
| if self.use_pseudomodem: |
| self.pseudo_modem_manager.Stop() |
| for modem_manager in self.real_managers: |
| try: |
| utils.run('/sbin/start %s' % modem_manager) |
| except error.CmdError: |
| pass |
| |
| |
| class VirtualEthernetInterface(object): |
| |
| def __init__(self): |
| self.vif = virtual_ethernet_pair.VirtualEthernetPair( |
| interface_name=IFACE_NAME, |
| peer_interface_name=PEER_IFACE_NAME, |
| interface_ip=None, |
| peer_interface_ip=IFACE_IP_BASE + '.1/24') |
| self.dnsmasq = None |
| |
| def BringIfaceUp(self): |
| utils.run('sudo ifconfig %s up' % IFACE_NAME) |
| |
| def BringIfaceDown(self): |
| utils.run('sudo ifconfig %s down' % IFACE_NAME); |
| |
| def StartDHCPServer(self): |
| lease_file = '/tmp/dnsmasq.%s.leases' % IFACE_NAME |
| os.close(os.open(lease_file, os.O_CREAT | os.O_TRUNC)) |
| self.dnsmasq = subprocess.Popen( |
| ['sudo', |
| '/usr/local/sbin/dnsmasq', |
| '--pid-file', |
| '-k', |
| '--dhcp-leasefile=' + lease_file, |
| '--dhcp-range=%s.2,%s.254' % ( |
| IFACE_IP_BASE, IFACE_IP_BASE), |
| '--port=0', |
| '--interface=' + PEER_IFACE_NAME, |
| '--bind-interfaces' |
| ]) |
| |
| def StopDHCPServer(self): |
| if self.dnsmasq: |
| self.dnsmasq.terminate() |
| |
| def RestartDHCPServer(self): |
| self.StopDHCPServer() |
| self.StartDHCPServer() |
| |
| def Setup(self): |
| self.vif.setup() |
| self.BringIfaceDown() |
| if not self.vif.is_healthy: |
| raise Exception('Could not initialize virtual ethernet pair') |
| utils.run('sudo route add -host 255.255.255.255 dev ' + |
| PEER_IFACE_NAME) |
| self.StartDHCPServer() |
| |
| def Teardown(self): |
| self.StopDHCPServer() |
| try: |
| utils.run('sudo route del -host 255.255.255.255 dev ' + |
| PEER_IFACE_NAME) |
| except: |
| pass |
| self.vif.teardown() |
| |
| def Restart(self): |
| self.Teardown() |
| self.Setup() |
| |
| virtual_ethernet_interface = VirtualEthernetInterface() |
| |
| class PseudoModemManager(object): |
| """ |
| This class is responsible for setting up the virtual ethernet interfaces, |
| initializing the DBus objects and running the main loop. |
| |
| This class can be utilized either using Python's with statement, or by |
| calling Start and Stop: |
| |
| with PseudoModemManager(modem, sim): |
| ... do stuff ... |
| |
| or |
| |
| pmm = PseudoModemManager(modem, sim) |
| pmm.Start() |
| ... do stuff ... |
| pmm.Stop() |
| |
| The PseudoModemManager constructor takes a variable called "detach". If a |
| value of True is given, the PseudoModemManager will run the main loop in |
| a child process. This is particularly useful when using PseudoModemManager |
| in an autotest: |
| |
| with PseudoModemManager(modem, sim, detach=True): |
| ... This will run the modem manager in the background while this |
| block executes. When the code in this block finishes running, the |
| PseudoModemManager will automatically kill the child process. |
| |
| If detach=False, then the pseudo modem manager will run the main process |
| until the process exits. PseudoModemManager is created with detach=False |
| when this file is run as an executable. |
| |
| """ |
| |
| def __init__(self, |
| modem=None, |
| sim=None, |
| detach=True, |
| logfile=None): |
| # TODO(armansito): The following line just doesn't work. |
| logging.basicConfig(format='%(asctime)-15s %(message)s', |
| filename=logfile, |
| level=logging.DEBUG) |
| if not modem: |
| # Create default modem |
| modem = modem_3gpp.Modem3gpp() |
| self.modem = modem |
| self.sim = sim |
| self.detach = detach |
| self.child = None |
| |
| def __enter__(self): |
| self.Start() |
| return self |
| |
| def __exit__(self, *args): |
| self.Stop() |
| |
| def Start(self): |
| # TODO(armansito): See crosbug.com/36235 |
| |
| global virtual_ethernet_interface |
| virtual_ethernet_interface.Setup() |
| if self.detach: |
| self.child = os.fork() |
| if self.child == 0: |
| self._Run() |
| else: |
| time.sleep(PARENT_SLEEP_TIMEOUT) |
| else: |
| self._Run() |
| |
| def Stop(self): |
| if self.detach: |
| if self.child != 0: |
| os.kill(self.child, signal.SIGINT) |
| os.waitpid(self.child, 0) |
| self._Cleanup() |
| else: |
| self._Cleanup() |
| |
| def _Cleanup(self): |
| global virtual_ethernet_interface |
| virtual_ethernet_interface.Teardown() |
| |
| def _Run(self): |
| if not self.modem: |
| raise Exception('No modem object has been provided.') |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| bus = dbus.SystemBus() |
| name = dbus.service.BusName(mm1.I_MODEM_MANAGER, bus) |
| self.manager = modemmanager.ModemManager(bus) |
| |
| self.modem.SetBus(bus) |
| if self.sim: |
| self.modem.SetSIM(self.sim) |
| self.manager.Add(self.modem) |
| |
| mainloop = gobject.MainLoop() |
| |
| def SignalHandler(signum, frame): |
| logging.info('Signal handler called with signal %s', signum) |
| self.manager.Remove(self.modem) |
| mainloop.quit() |
| |
| signal.signal(signal.SIGINT, SignalHandler) |
| signal.signal(signal.SIGTERM, SignalHandler) |
| |
| mainloop.run() |
| |
| def SendTextMessage(self, sender_no, text): |
| # TODO(armansito): Implement |
| pass |
| |
| |
| def Start(options): |
| # TODO(armansito): Use options here to figure out the correct |
| # modem to create |
| sim_obj = sim.SIM(sim.SIM.Carrier(), mm1.MM_MODEM_ACCESS_TECHNOLOGY_GSM) |
| with PseudoModemManager(sim=sim_obj, detach=False, logfile=options.logfile): |
| pass |
| |
| def main(): |
| usage = """ |
| |
| Run pseudomodem to simulate a modem using the modemmanager-next |
| DBus interfaces. |
| |
| Use --help for info. |
| |
| """ |
| parser = optparse.OptionParser(usage=usage) |
| parser.add_option('-c', '--carrier', dest='carrier_name', |
| default=DEFAULT_CARRIER, |
| metavar='<carrier name>', |
| help='<carrier name> := anything') |
| parser.add_option('-l', '--logfile', dest='logfile', |
| default=None, |
| metavar='<filename>', |
| help='<filename> := filename for logging output') |
| parser.add_option('-t', '--technology', dest='tech', |
| default='3GPP', |
| metavar='<technology>', |
| help='<technology> := 3GPP|CDMA|LTE') |
| |
| options = parser.parse_args()[0] |
| |
| Start(options) |
| |
| |
| if __name__ == '__main__': |
| main() |