blob: 2e6706ae811f018f509806d16086624d6b70cb2e [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import pyudev
import re
import select
import struct
import subprocess
import threading
import time
from autotest_lib.client.common_lib import error
JAIL_CONTROL_PATH = '/dev/jail-control'
JAIL_REQUEST_PATH = '/dev/jail-request'
# From linux/device_jail.h.
REQUEST_ALLOW = 0
REQUEST_ALLOW_WITH_LOCKDOWN = 1
REQUEST_ALLOW_WITH_DETACH = 2
REQUEST_DENY = 3
class OSFile:
"""Simple context manager for file descriptors."""
def __init__(self, path, flag):
self._fd = os.open(path, flag)
def close(self):
os.close(self._fd)
def __enter__(self):
"""Returns the fd so it can be used in with-blocks."""
return self._fd
def __exit__(self, exc_type, exc_val, traceback):
self.close()
class ConcurrentFunc:
"""Simple context manager that starts and joins a thread."""
def __init__(self, target_func, timeout_func):
self._thread = threading.Thread(target=target_func)
self._timeout_func = timeout_func
self._target_name = target_func.__name__
def __enter__(self):
self._thread.start()
def __exit__(self, exc_type, exc_val, traceback):
self._thread.join(self._timeout_func())
if self._thread.is_alive() and not exc_val:
raise error.TestError('Function %s timed out' % self._target_name)
class JailDevice:
TIMEOUT_SEC = 3
PATH_MAX = 4096
def __init__(self, path_to_jail):
self._path_to_jail = path_to_jail
def __enter__(self):
"""
Creates a jail device for the device located at self._path_to_jail.
If the jail already exists, don't take ownership of it.
"""
try:
output = subprocess.check_output(
['device_jail_utility',
'--add={0}'.format(self._path_to_jail)],
stderr=subprocess.STDOUT)
match = re.search('created jail at (.*)', output)
if match:
self._path = match.group(1)
self._owns_device = True
return self
match = re.search('jail already exists at (.*)', output)
if match:
self._path = match.group(1)
self._owns_device = False
return self
raise error.TestError('Failed to create device jail')
except subprocess.CalledProcessError as e:
raise error.TestError('Failed to call device_jail_utility')
def expect_open(self, verdict):
"""
Tries to open the jail device. This method mocks out the
device_jail request server which is normally run by permission_broker.
This allows us to set the verdict we want to test. Since the open
call will block until we return the verdict, we have to use a
separate thread to perform the open call, as well.
"""
# Python 2 does not support "nonlocal" so this closure can't
# set the values of identifiers it closes over unless they
# are in global scope. Work around this by using a list and
# value-mutation.
dev_file_wrapper = [None]
def open_device():
try:
dev_file_wrapper[0] = OSFile(self._path, os.O_RDWR)
except OSError as e:
# We don't throw an error because this might be intentional,
# such as when the verdict is REQUEST_DENY.
logging.info("Failed to open jail device: %s", e.strerror)
# timeout_sec should be used for the timeouts below.
# This ensures we don't spend much longer than TIMEOUT_SEC in
# this method.
deadline = time.time() + self.TIMEOUT_SEC
def timeout_sec():
return max(deadline - time.time(), 0.01)
# We have to use FDs because polling works with FDs and
# buffering is silly.
try:
req_f = OSFile(JAIL_REQUEST_PATH, os.O_RDWR)
except OSError as e:
raise error.TestError(
'Failed to open request device: %s' % e.strerror)
with req_f as req_fd:
poll_obj = select.poll()
poll_obj.register(req_fd, select.POLLIN)
# Starting open_device should ensure we have a request waiting
# on the request device.
with ConcurrentFunc(open_device, timeout_sec):
ready_fds = poll_obj.poll(timeout_sec() * 1000)
if not ready_fds:
raise error.TestError('Timed out waiting for jail-request')
# Sanity check the request.
path = os.read(req_fd, self.PATH_MAX)
logging.info('Received jail-request for path %s', path)
if path != self._path_to_jail:
raise error.TestError('Got request for the wrong path')
os.write(req_fd, struct.pack('I', verdict))
logging.info('Responded to jail-request')
return dev_file_wrapper[0]
def __exit__(self, exc_type, exc_val, traceback):
if self._owns_device:
subprocess.call(['device_jail_utility',
'--remove={0}'.format(self._path)])
def get_usb_devices():
context = pyudev.Context()
return [device for device in context.list_devices()
if device.device_node and device.device_node.startswith('/dev/bus/usb')]