| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import pyudev |
| import re |
| import select |
| import struct |
| import subprocess |
| import threading |
| import time |
| from autotest_lib.client.common_lib import error |
| |
| |
| JAIL_CONTROL_PATH = '/dev/jail-control' |
| JAIL_REQUEST_PATH = '/dev/jail-request' |
| |
| # From linux/device_jail.h. |
| REQUEST_ALLOW = 0 |
| REQUEST_ALLOW_WITH_LOCKDOWN = 1 |
| REQUEST_ALLOW_WITH_DETACH = 2 |
| REQUEST_DENY = 3 |
| |
| |
| class OSFile: |
| """Simple context manager for file descriptors.""" |
| def __init__(self, path, flag): |
| self._fd = os.open(path, flag) |
| |
| def close(self): |
| os.close(self._fd) |
| |
| def __enter__(self): |
| """Returns the fd so it can be used in with-blocks.""" |
| return self._fd |
| |
| def __exit__(self, exc_type, exc_val, traceback): |
| self.close() |
| |
| |
| class ConcurrentFunc: |
| """Simple context manager that starts and joins a thread.""" |
| def __init__(self, target_func, timeout_func): |
| self._thread = threading.Thread(target=target_func) |
| self._timeout_func = timeout_func |
| self._target_name = target_func.__name__ |
| |
| def __enter__(self): |
| self._thread.start() |
| |
| def __exit__(self, exc_type, exc_val, traceback): |
| self._thread.join(self._timeout_func()) |
| if self._thread.is_alive() and not exc_val: |
| raise error.TestError('Function %s timed out' % self._target_name) |
| |
| |
| class JailDevice: |
| TIMEOUT_SEC = 3 |
| PATH_MAX = 4096 |
| |
| def __init__(self, path_to_jail): |
| self._path_to_jail = path_to_jail |
| |
| |
| def __enter__(self): |
| """ |
| Creates a jail device for the device located at self._path_to_jail. |
| If the jail already exists, don't take ownership of it. |
| """ |
| try: |
| output = subprocess.check_output( |
| ['device_jail_utility', |
| '--add={0}'.format(self._path_to_jail)], |
| stderr=subprocess.STDOUT) |
| |
| match = re.search('created jail at (.*)', output) |
| if match: |
| self._path = match.group(1) |
| self._owns_device = True |
| return self |
| |
| match = re.search('jail already exists at (.*)', output) |
| if match: |
| self._path = match.group(1) |
| self._owns_device = False |
| return self |
| |
| raise error.TestError('Failed to create device jail') |
| except subprocess.CalledProcessError as e: |
| raise error.TestError('Failed to call device_jail_utility') |
| |
| |
| def expect_open(self, verdict): |
| """ |
| Tries to open the jail device. This method mocks out the |
| device_jail request server which is normally run by permission_broker. |
| This allows us to set the verdict we want to test. Since the open |
| call will block until we return the verdict, we have to use a |
| separate thread to perform the open call, as well. |
| """ |
| # Python 2 does not support "nonlocal" so this closure can't |
| # set the values of identifiers it closes over unless they |
| # are in global scope. Work around this by using a list and |
| # value-mutation. |
| dev_file_wrapper = [None] |
| def open_device(): |
| try: |
| dev_file_wrapper[0] = OSFile(self._path, os.O_RDWR) |
| except OSError as e: |
| # We don't throw an error because this might be intentional, |
| # such as when the verdict is REQUEST_DENY. |
| logging.info("Failed to open jail device: %s", e.strerror) |
| |
| # timeout_sec should be used for the timeouts below. |
| # This ensures we don't spend much longer than TIMEOUT_SEC in |
| # this method. |
| deadline = time.time() + self.TIMEOUT_SEC |
| def timeout_sec(): |
| return max(deadline - time.time(), 0.01) |
| |
| # We have to use FDs because polling works with FDs and |
| # buffering is silly. |
| try: |
| req_f = OSFile(JAIL_REQUEST_PATH, os.O_RDWR) |
| except OSError as e: |
| raise error.TestError( |
| 'Failed to open request device: %s' % e.strerror) |
| |
| with req_f as req_fd: |
| poll_obj = select.poll() |
| poll_obj.register(req_fd, select.POLLIN) |
| |
| # Starting open_device should ensure we have a request waiting |
| # on the request device. |
| with ConcurrentFunc(open_device, timeout_sec): |
| ready_fds = poll_obj.poll(timeout_sec() * 1000) |
| if not ready_fds: |
| raise error.TestError('Timed out waiting for jail-request') |
| |
| # Sanity check the request. |
| path = os.read(req_fd, self.PATH_MAX) |
| logging.info('Received jail-request for path %s', path) |
| if path != self._path_to_jail: |
| raise error.TestError('Got request for the wrong path') |
| |
| os.write(req_fd, struct.pack('I', verdict)) |
| logging.info('Responded to jail-request') |
| |
| return dev_file_wrapper[0] |
| |
| |
| def __exit__(self, exc_type, exc_val, traceback): |
| if self._owns_device: |
| subprocess.call(['device_jail_utility', |
| '--remove={0}'.format(self._path)]) |
| |
| |
| def get_usb_devices(): |
| context = pyudev.Context() |
| return [device for device in context.list_devices() |
| if device.device_node and device.device_node.startswith('/dev/bus/usb')] |