| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import errno |
| import logging |
| import os |
| import os.path |
| import pyudev |
| import stat |
| import subprocess |
| import tempfile |
| import threading |
| from autotest_lib.client.bin import test, utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import device_jail_utils |
| |
| |
| class security_DeviceJail_Filesystem(test.test): |
| """ |
| Ensures that if the device jail filesystem is present, it restricts |
| the devices that can be seen and instantiates jails. |
| """ |
| version = 1 |
| |
| SHUTDOWN_TIMEOUT_SEC = 5 |
| |
| def warmup(self): |
| super(security_DeviceJail_Filesystem, self).warmup() |
| if not os.path.exists(device_jail_utils.JAIL_CONTROL_PATH): |
| raise error.TestNAError('Device jail is not present') |
| |
| self._mount = tempfile.mkdtemp(prefix='djfs_test_') |
| logging.debug('Attempting to mount device_jail_fs on %s', self._mount) |
| try: |
| self._subprocess = subprocess.Popen(['device_jail_fs', self._mount]) |
| except OSError as e: |
| if e.errno == errno.ENOENT: |
| raise error.TestNAError('Device jail FS is not present') |
| else: |
| raise error.TestError( |
| 'Device jail FS failed to start: %s' % e.strerror) |
| |
| |
| def _is_jail_device(self, filename): |
| context = pyudev.Context() |
| device = pyudev.Device.from_device_file(context, filename) |
| return device.subsystem == 'device_jail' |
| |
| |
| def _check_device(self, filename): |
| logging.debug('Checking device %s', filename) |
| # Devices outside of the FS are fine. |
| if not filename.startswith(self._mount): |
| return |
| |
| # Remove mount from full path. |
| relative_dev_path = filename[len(self._mount) + 1:] |
| |
| # Check if this is a passthrough device. |
| if relative_dev_path in ['null', 'full', 'zero', 'urandom']: |
| return |
| |
| # Ensure USB devices are jailed. |
| if relative_dev_path.startswith('bus/usb'): |
| if self._is_jail_device(filename): |
| return |
| else: |
| raise error.TestError('Device should be jailed: %s' % filename) |
| |
| # All other devices should be hidden. |
| raise error.TestError('Device should be hidden: %s' % filename) |
| |
| |
| def run_once(self): |
| for dirpath, _, filenames in os.walk(self._mount): |
| for filename in filenames: |
| real_path = os.path.realpath(os.path.join(dirpath, filename)) |
| try: |
| mode = os.stat(real_path).st_mode |
| except OSError as e: |
| continue |
| |
| if stat.S_ISCHR(mode): |
| self._check_device(real_path) |
| |
| |
| def _clean_shutdown(self): |
| subprocess.check_call(['fusermount', '-u', self._mount]) |
| self._subprocess.wait() |
| |
| |
| def _hard_shutdown(self): |
| logging.warn('Timeout expired, killing device_jail_fs') |
| self._subprocess.kill() |
| self._subprocess.wait() |
| |
| |
| def cleanup(self): |
| super(security_DeviceJail_Filesystem, self).cleanup() |
| if hasattr(self, '_subprocess'): |
| logging.info('Waiting %d seconds for device_jail_fs shutdown', |
| self.SHUTDOWN_TIMEOUT_SEC) |
| timeout = threading.Timer(self.SHUTDOWN_TIMEOUT_SEC, |
| self._hard_shutdown) |
| timeout.start() |
| self._clean_shutdown() |
| timeout.cancel() |
| |
| try: |
| os.rmdir(self._mount) |
| except OSError as e: |
| raise error.TestError('Failed to remove temp dir: %s' % e.strerror) |