blob: 7c9a37014c5156e848a7c9513bd80dc3a2e4ed2d [file] [log] [blame] [edit]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, tempfile, shutil, stat, time, posix
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
# TODO:
# - mock out TPM and check all error conditions
# - test failure when things aren't mounted correctly
class test_checker(object):
def __init__(self):
logging.info("test_checker.__init__")
# Empty failure list means test passes.
self._failures = []
def _passed(self, msg):
logging.info('ok: %s' % (msg))
def _failed(self, msg):
logging.error('FAIL: %s' % (msg))
self._failures.append(msg)
def _fatal(self, msg):
logging.error('FATAL: %s' % (msg))
raise error.TestError(msg)
def check(self, boolean, msg, fatal=False):
if boolean == True:
self._passed(msg)
else:
msg = "could not satisfy '%s'" % (msg)
if fatal:
self._fatal(msg)
else:
self._failed(msg)
def test_raise(self):
# Raise a failure if anything unexpected was seen.
if len(self._failures):
raise error.TestFail((", ".join(self._failures)))
chk = test_checker()
class EncryptedStateful(object):
def _prepare_simulated_root(self):
os.makedirs(self.var)
os.makedirs(self.chronos)
os.makedirs(self.stateful)
# Build fake stateful block device (emulate 10G sda1).
self.stateful_block = os.path.join(self.root, 'stateful.block')
utils.system("truncate -s 10G %s" % (self.stateful_block))
utils.system("mkfs.ext4 -F %s" % (self.stateful_block))
utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" %
(self.stateful_block, self.stateful))
def __init__(self, root=None):
if root == None:
self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition',
prefix='.test-enc-stateful-')
self.simulated = True
else:
self.root = root
self.simulated = False
self.var = os.path.join(self.root, 'var')
self.chronos = os.path.join(self.root, 'home', 'chronos')
self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition')
self.mount_log = os.path.join(self.stateful, 'mount.log')
self.key = os.path.join(self.stateful, 'encrypted.key')
self.needs_finalization = os.path.join(self.stateful,
'encrypted.needs-finalization')
self.block = os.path.join(self.stateful, 'encrypted.block')
self.encrypted = os.path.join(self.stateful, 'encrypted')
if self.simulated:
try:
self._prepare_simulated_root()
except:
shutil.rmtree(self.root)
raise
self.mounted = not self.simulated
def mount(self, args=""):
if self.mounted or not self.simulated:
return
# TODO(keescook): figure out what is killing the resizer and
# remove the explicit use of "tee" here.
# Without the pipe to "tee", mount-encrypted's forked resizing
# process gets killed, even though it is using daemon(). (Is
# autotest doing something odd here?) This leaves the filesystem
# unresized. It would be better to have the resizer running in
# the background, as it is designed, so we can examine its behavior
# during testing (e.g. "does the filesystem actually grow?").
utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted %s 2>&1 "
"| tee %s" % (self.root, args, self.mount_log))
self.mounted = True
def umount(self):
if not self.mounted or not self.simulated:
return
utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" %
(self.root))
self.mounted = False
# Clean up when destroyed.
def __del__(self):
if self.simulated:
self.umount()
utils.system("umount -n %s" % (self.stateful))
shutil.rmtree(self.root)
# Perform common post-mount size/owner checks on the filesystem and
# backing files.
def check_sizes(self, finalized=True):
# Do we have the expected backing files?
chk.check(os.path.exists(self.block), "%s exists" % (self.block))
if finalized:
keyfile = self.key
other = self.needs_finalization
else:
keyfile = self.needs_finalization
other = self.key
chk.check(os.path.exists(keyfile), "%s exists" % (keyfile))
chk.check(not os.path.exists(other), "%s does not exist" % (other))
# Sanity check the key file stat.
info = os.stat(keyfile)
chk.check(stat.S_ISREG(info.st_mode),
"%s is regular file" % (keyfile))
chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile))
chk.check(info.st_gid == 0, "%s has group root" % (keyfile))
chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
"%s is S_IRUSR | S_IWUSR" % (keyfile))
chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile))
# Sanity check the block file stat.
info = os.stat(self.block)
chk.check(stat.S_ISREG(info.st_mode),
"%s is regular file" % (self.block))
chk.check(info.st_uid == 0, "%s is owned by root" % (self.block))
chk.check(info.st_gid == 0, "%s has group root" % (self.block))
chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
"%s is S_IRUSR | S_IWUSR" % (self.block))
# Make sure block file is roughly a third of the size of the root
# filesystem (within 5%).
top = os.statvfs(self.stateful)
backing_size = float(info.st_size)
third = top.f_blocks * top.f_frsize * .3
chk.check(backing_size > (third * .95)
and backing_size < (third * 1.05),
"%s is near %d bytes (was %d)" % (self.block, third,
info.st_size))
# Wait for resize manager task to finish.
utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted",
ignore_status=True) != 0,
error.TestError('resizer still running'))
# Verify filesystem is within 5% of the block file size.
info = os.statvfs(self.encrypted)
encrypted_size = float(info.f_frsize) * float(info.f_blocks)
chk.check(encrypted_size / backing_size > 0.95,
"%s fs (%d) is nearly the backing device size (%d)" %
(self.encrypted, encrypted_size, backing_size))
# Verify there is a reasonable number of inodes in the encrypted
# filesystem (near 25% inodes-to-blocks ratio).
inode_ratio = float(info.f_files) / float(info.f_blocks)
chk.check(inode_ratio > 0.20 and inode_ratio < 0.30,
"%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" %
(self.encrypted, inode_ratio*100))
# Raise non-fatal failures now, if they were encountered.
chk.test_raise()
# Wait for kernel background writing to finish.
def _backing_stabilize(self):
start = None
size = 0
while True:
k = long(utils.system_output("du -sk %s" % (self.block),
retain_output = True).split()[0])
if start == None:
start = k
if k == size:
# Backing file has remained the same size for 10 seconds.
# Assume the kernel is done with background initialization.
break
time.sleep(10)
utils.system("sync")
size = k
logging.info("%s stabilized at %dK (was %dK)" %
(self.block, size, start))
# Check that the backing file reclaims space when filesystem contents
# are deleted.
def check_reclamation(self):
# This test is sensitive to other things happening on the filesystem,
# so we must wait for background initialization to finish first.
self._backing_stabilize()
megs = 200
data = os.path.join(self.var, "check_reclamation")
orig = os.statvfs(self.stateful)
# 200M file added to encrypted filesystem.
utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data,
megs))
# Wait for background allocations to finish.
self._backing_stabilize()
filled = os.statvfs(self.stateful)
# 200M file removed from encrypted filesystem.
utils.system("rm %s; sync" % (data))
# Wait for background hole-punching to finish.
self._backing_stabilize()
done = os.statvfs(self.stateful)
# Did the underlying filesystem grow by the size of the test file?
file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize)
fs_blocks_used = float(orig.f_bfree - filled.f_bfree)
chk.check(file_blocks_used / fs_blocks_used > 0.95,
"%d file blocks account for most of %d fs blocks" %
(file_blocks_used, fs_blocks_used))
# Did the underlying filesystem shrink on removal?
fs_blocks_done = float(orig.f_bfree - done.f_bfree)
chk.check(fs_blocks_done / file_blocks_used < 0.05,
"most of %d fs blocks reclaimed (%d fs blocks left over, "
"free: %d -> %d -> %d)" %
(fs_blocks_used, fs_blocks_done,
orig.f_bfree, filled.f_bfree, done.f_bfree))
# Raise non-fatal failures now, if they were encountered.
chk.test_raise()
class platform_EncryptedStateful(test.test):
version = 1
# With b/80549098, PUNCH_HOLE was disabled for all kernel trees
# before v4.4. This means that the reclamation check will only work
# with kernels that support PUNCH_HOLE.
def is_punch_hole_supported(self):
kernel_ver = os.uname()[2]
if utils.compare_versions(kernel_ver, "4.4") < 0 :
return False
return True
def existing_partition(self):
# Examine the existing encrypted partition.
encstate = EncryptedStateful("/")
# Perform post-mount sanity checks (and handle unfinalized devices).
encstate.check_sizes(finalized=os.path.exists(encstate.key))
def factory_key(self):
# Create test root directory.
encstate = EncryptedStateful()
# Make sure we haven't run here before.
chk.check(not os.path.exists(encstate.key),
"%s does not exist" % (encstate.key))
chk.check(not os.path.exists(encstate.block),
"%s does not exist" % (encstate.block))
# Mount a fresh encrypted stateful, with factory static key.
encstate.mount("factory")
# Perform post-mount sanity checks.
encstate.check_sizes()
# Check disk reclamation for kernels that support PUNCH_HOLE.
if self.is_punch_hole_supported():
encstate.check_reclamation()
# Check explicit umount.
encstate.umount()
def no_tpm(self):
encstate = EncryptedStateful()
# Relocate the TPM device during mount.
tpm = "/dev/tpm0"
off = "%s.off" % (tpm)
try:
if os.path.exists(tpm):
utils.system("mv %s %s" % (tpm, off))
# Mount without a TPM.
encstate.mount()
finally:
if os.path.exists(off):
utils.system("mv %s %s" % (off, tpm))
# Perform post-mount sanity checks.
encstate.check_sizes(finalized=False)
def run_once(self):
# Do a no-write test of system's existing encrypted partition.
self.existing_partition()
# Do a no-write, no-TPM test with sanity checks.
self.no_tpm()
# There is no interactively controllable TPM mock yet for
# mount-encrypted, so we can only test the static key currently.
self.factory_key()