blob: c739d4edf951696b160a2e81c4634ffc2326b773 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import errno
import logging
import os
import os.path
import pyudev
import stat
import subprocess
import tempfile
import threading
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import device_jail_utils
class security_DeviceJail_Filesystem(test.test):
"""
Ensures that if the device jail filesystem is present, it restricts
the devices that can be seen and instantiates jails.
"""
version = 1
SHUTDOWN_TIMEOUT_SEC = 5
def warmup(self):
super(security_DeviceJail_Filesystem, self).warmup()
if not os.path.exists(device_jail_utils.JAIL_CONTROL_PATH):
raise error.TestNAError('Device jail is not present')
self._mount = tempfile.mkdtemp(prefix='djfs_test_')
logging.debug('Attempting to mount device_jail_fs on %s', self._mount)
try:
self._subprocess = subprocess.Popen(['device_jail_fs', self._mount])
except OSError as e:
if e.errno == errno.ENOENT:
raise error.TestNAError('Device jail FS is not present')
else:
raise error.TestError(
'Device jail FS failed to start: %s' % e.strerror)
def _is_jail_device(self, filename):
context = pyudev.Context()
device = pyudev.Device.from_device_file(context, filename)
return device.subsystem == 'device_jail'
def _check_device(self, filename):
logging.debug('Checking device %s', filename)
# Devices outside of the FS are fine.
if not filename.startswith(self._mount):
return
# Remove mount from full path.
relative_dev_path = filename[len(self._mount) + 1:]
# Check if this is a passthrough device.
if relative_dev_path in ['null', 'full', 'zero', 'urandom']:
return
# Ensure USB devices are jailed.
if relative_dev_path.startswith('bus/usb'):
if self._is_jail_device(filename):
return
else:
raise error.TestError('Device should be jailed: %s' % filename)
# All other devices should be hidden.
raise error.TestError('Device should be hidden: %s' % filename)
def run_once(self):
for dirpath, _, filenames in os.walk(self._mount):
for filename in filenames:
real_path = os.path.realpath(os.path.join(dirpath, filename))
try:
mode = os.stat(real_path).st_mode
except OSError as e:
continue
if stat.S_ISCHR(mode):
self._check_device(real_path)
def _clean_shutdown(self):
subprocess.check_call(['fusermount', '-u', self._mount])
self._subprocess.wait()
def _hard_shutdown(self):
logging.warn('Timeout expired, killing device_jail_fs')
self._subprocess.kill()
self._subprocess.wait()
def cleanup(self):
super(security_DeviceJail_Filesystem, self).cleanup()
if hasattr(self, '_subprocess'):
logging.info('Waiting %d seconds for device_jail_fs shutdown',
self.SHUTDOWN_TIMEOUT_SEC)
timeout = threading.Timer(self.SHUTDOWN_TIMEOUT_SEC,
self._hard_shutdown)
timeout.start()
self._clean_shutdown()
timeout.cancel()
try:
os.rmdir(self._mount)
except OSError as e:
raise error.TestError('Failed to remove temp dir: %s' % e.strerror)