blob: 21bf53cb766502d8c656ae2243c1a245a5d68a5c [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Forward USB devices from the caller to the target device.
Automates the process of forwading specified USB devices to a target device.
This involves:
1) Loading the prerequiste kernel modules (both locally and on the target
device).
2) Running and cleaning up `usbipd`.
3) Setting up a SSH tunnel for the `usbipd` TCP port.
4) Bind the devices to the usbip kernel driver.
5) Attach the devices to the target.
6) Clean up on exit so that the USB devices will function again locally.
For example:
./forward_usb_devices.py --log-level=debug -d test-device.local 1-3.1 1-3.2
will forward two USB devices (the ones at bus ids 1-3.1 and 1-3.2) to the device
with the hostname `test-device.local`.
"""
from __future__ import print_function
import contextlib
import logging
import os
import shutil
import signal
import subprocess
import sys
import time
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import osutils
from chromite.lib import remote_access
from chromite.lib import retry_util
HOST_MODULES = {'usbip-core', 'usbip-host', 'vhci-hcd'}
CLIENT_MODULES = {'usbip-core', 'vhci-hcd'}
KILL_COMMAND = 'kill'
MODPROBE_COMMAND = 'modprobe'
USBIP_PACKAGE = 'usbip'
USBIP_COMMAND = 'usbip'
USBIPD_COMMAND = 'usbipd'
USBIPD_PID_FILE = '/run/usbipd.pid'
USBIPD_PORT = 3240
RETRY_USBIPD_READ_PID = 10
DELAY_USBIPD_READ_PID = 0.5
def main(argv):
"""Forward USB devices from the caller to the target device."""
os.environ['PATH'] += ':/sbin:/usr/sbin'
opts = get_opts(argv)
return forward_devices(opts.device.hostname, opts.device.port,
opts.usb_devices)
def forward_devices(hostname, port, usb_devices):
"""Forward USB devices from the caller to the target device."""
if shutil.which(USBIP_COMMAND) is not None:
logging.debug('`%s` found in the chroot', USBIP_COMMAND)
else:
logging.error(
'You need to emerge the `%s` package in the chroot: sudo emerge %s',
USBIP_PACKAGE, USBIP_PACKAGE)
logging.debug('Connectiong to root@%s:%s`', hostname, port)
with contextlib.ExitStack() as stack:
device = stack.enter_context(
remote_access.ChromiumOSDeviceHandler(hostname=hostname, port=port,
username='root'))
if device.HasProgramInPath(USBIP_COMMAND):
logging.debug('`%s` found on the device', USBIP_COMMAND)
else:
logging.error(
'You need to emerge and deploy the `%s` packge to the test '
'device: emerge-${{BOARD}} %s && cros deploy '
'--board=${{BOARD}} %s',
USBIP_PACKAGE, USBIP_PACKAGE, USBIP_PACKAGE)
return False
tunnel_is_alive = stack.enter_context(setup_usbip_tunnel(device))
if not load_modules(device=device):
return False
if not stack.enter_context(start_usbipd()):
return False
for busid in usb_devices:
if not stack.enter_context(bind_usb_device(busid)):
return False
if not tunnel_is_alive():
logging.error('SSH tunnel is dead. Aborting.')
return False
for i, busid in enumerate(usb_devices):
if not stack.enter_context(attach_usb_device(device, busid, i)):
return False
# Catch SIGINT, SIGTERM, and SIGHUP.
try:
signal.signal(signal.SIGINT, signal.default_int_handler)
signal.signal(signal.SIGTERM, signal.default_int_handler)
signal.signal(signal.SIGHUP, signal.default_int_handler)
logging.info('Ready. Press Ctrl-C (SIGINT) to cleanup.')
while True:
time.sleep(60)
except KeyboardInterrupt:
pass
logging.debug('Cleanup complete.')
return True
def get_opts(argv):
"""Parse the command-line options."""
parser = commandline.ArgumentParser(description=forward_devices.__doc__)
parser.add_argument(
'-d', '--device',
type=commandline.DeviceParser(commandline.DEVICE_SCHEME_SSH),
help='The target device to forward the USB devices to '
'(hostname[:port]).')
parser.add_argument('usb_devices', nargs='+',
help='Bus identifiers of USB devices to forward')
opts = parser.parse_args(argv)
opts.Freeze()
return opts
def load_modules(device=None):
"""Load prerequiste kernel modules.
The modules will first be loaded on the calling machine. If device is set,
the prerequiste kernel for the target device will also be loaded.
"""
for module in HOST_MODULES:
try:
cros_build_lib.sudo_run([MODPROBE_COMMAND, module])
except cros_build_lib.RunCommandError:
logging.error('Failed to load module on host: %s', module)
return False
logging.debug('Loaded module on host: %s', module)
if device is not None:
for module in CLIENT_MODULES:
try:
device.run([MODPROBE_COMMAND, module])
except cros_build_lib.RunCommandError:
logging.error('Failed to load module on target: %s', module)
return False
logging.debug('Loaded module on target: %s', module)
return True
@contextlib.contextmanager
def start_usbipd():
"""Starts the `usbipd` daemon in the background.
On cleanup kills the daemon.
Returns:
False on failure.
"""
try:
cros_build_lib.sudo_run(
[USBIPD_COMMAND, '-D', '-P%s' % USBIPD_PID_FILE])
except cros_build_lib.RunCommandError:
logging.error('Failed to start: %s', USBIPD_COMMAND)
yield False
return
logging.debug('Started on host: %s', USBIPD_COMMAND)
# Give the daemon a chance to write the PID file.
pid = retry_util.GenericRetry(
handler=lambda e: isinstance(e, FileNotFoundError),
max_retry=RETRY_USBIPD_READ_PID,
functor=lambda: int(osutils.ReadFile(USBIPD_PID_FILE).strip()),
sleep=DELAY_USBIPD_READ_PID)
yield True
logging.debug('Killing `usbipd` (%d).', pid)
cros_build_lib.sudo_run([KILL_COMMAND, str(pid)])
@contextlib.contextmanager
def setup_usbip_tunnel(device):
"""Tunnels the `usbip` port over SSH to the target device.
On cleanup tears down the tunnel by killing the tunnel process.
Returns:
A callback to check if the tunnel is still alive.
"""
proc = device.GetAgent().CreateTunnel(
to_remote=[remote_access.PortForwardSpec(local_port=USBIPD_PORT)])
def alive():
"""Returns `True` if the SSH tunnel process is still alive."""
return proc.poll() is None
yield alive
logging.debug('Stopping `usbip` tunnel.')
proc.terminate()
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
@contextlib.contextmanager
def bind_usb_device(busid):
"""Binds the USB device at `busid` to usbip driver so it can be exported.
On cleanup unbinds the usb device.
Returns:
False on failure.
"""
try:
cros_build_lib.sudo_run([USBIP_COMMAND, 'bind', '-b', busid])
except cros_build_lib.RunCommandError:
logging.error('Failed to bind: %s', busid)
yield False
return
logging.debug('Bound: %s', busid)
yield True
logging.debug('unbinding: %s', busid)
cros_build_lib.sudo_run([USBIP_COMMAND, 'unbind', '-b', busid])
@contextlib.contextmanager
def attach_usb_device(device, busid, port):
"""Attaches the specified busid using `usbip`.
On cleanup detaches the USB device at the specified `usbip` port number.
Returns:
False on failure.
"""
try:
device.run([USBIP_COMMAND, 'attach', '-r', 'localhost', '-b', busid])
except cros_build_lib.RunCommandError:
logging.error('Failed to attach: %s', busid)
yield False
return
logging.debug('Attached: %s', busid)
yield True
try:
device.run([USBIP_COMMAND, 'detach', '-p', str(port)])
logging.debug('Detached usbip port: %s', port)
except cros_build_lib.RunCommandError:
logging.error('Failed to detach: %s', port)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))