blob: f4aa6e21663bf9d1e4b23d03ed7952eeb49276bd [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, re
import dbus
import common
import constants as chromeos_constants
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
class ChromiumOSError(error.InstallError):
"""Generic error for ChromiumOS-specific exceptions."""
def __run_cmd(cmd):
return utils.system_output(cmd + ' 2>&1', retain_output=True,
def get_user_hash(user):
"""Get the hash for the test user account."""
hash_cmd = CRYPTOHOME_CMD + ' --action=obfuscate_user --user=%s' % user
return __run_cmd(hash_cmd)
def get_tpm_status():
"""Get the TPM status.
A TPM status dictionary, for example:
{ 'Enabled': True,
'Owned': True,
'Being Owned': False,
'Ready': True,
'Password': ''
out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
status = {}
for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
match ='TPM %s: (true|false)' % field, out)
if not match:
raise ChromiumOSError('Invalid TPM status: "%s".' % out)
status[field] = == 'true'
match ='TPM Password: (\w*)', out)
status['Password'] = ''
if match:
status['Password'] =
return status
def take_tpm_ownership():
"""Take TPM owernship.
Blocks until TPM is owned.
__run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
__run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership')
def remove_vault(user):
"""Remove the test user account."""
logging.debug('user is %s', user)
user_hash = get_user_hash(user)
logging.debug('Removing vault for user %s - %s' % (user, user_hash))
cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
# Ensure that the user directory does not exist
if os.path.exists(os.path.join('/home/.shadow/', user_hash)):
raise ChromiumOSError('Cryptohome could not remove the test user.')
def mount_vault(user, password, create=False):
cmd = (CRYPTOHOME_CMD + ' --action=mount --user=%s --password=%s' %
(user, password))
if create:
cmd += ' --create'
# Ensure that the user directory exists
user_hash = get_user_hash(user)
if not os.path.exists(os.path.join('/home/.shadow/', user_hash)):
raise ChromiumOSError('Cryptohome vault not found after mount.')
# Ensure that the user directory is mounted
if not is_mounted(allow_fail=True):
raise ChromiumOSError('Cryptohome created the user but did not mount.')
def test_auth(user, password):
cmd = (CRYPTOHOME_CMD + ' --action=test_auth --user=%s --password=%s' %
(user, password))
return 'Authentication succeeded' in __run_cmd(cmd)
def unmount_vault(user=None):
Unmount the directory. Once unmount-by-user is supported, the user
parameter will name the target user. See
cmd = (CRYPTOHOME_CMD + ' --action=unmount')
# Ensure that the user directory is not mounted
if is_mounted(allow_fail=True):
raise ChromiumOSError('Cryptohome did not unmount the user.')
def __get_mount_parts(expected_mountpt=chromeos_constants.CRYPTOHOME_MOUNT_PT,
allow_fail = False):
mount_line = utils.system_output(
'grep %s /proc/$(pgrep cryptohomed)/mounts' % expected_mountpt,
ignore_status = allow_fail)
return mount_line.split()
def current_mounted_vault(device=chromeos_constants.CRYPTOHOME_DEVICE_REGEX,
mount_line = utils.system_output(
'grep %s /proc/$(pgrep cryptohomed)/mounts' % expected_mountpt,
mount_parts = mount_line.split()
if len(mount_parts) > 0 and re.match(device, mount_parts[0]):
return mount_parts[0]
return None
def is_mounted(device=chromeos_constants.CRYPTOHOME_DEVICE_REGEX,
return None != current_mounted_vault(device=device,
def is_mounted_on_tmpfs(device = chromeos_constants.CRYPTOHOME_INCOGNITO,
expected_mountpt =
allow_fail = False):
mount_parts = __get_mount_parts(device, allow_fail)
return (len(mount_parts) > 2 and device == mount_parts[0] and
'tmpfs' == mount_parts[2])
def canonicalize(credential):
"""Perform basic canonicalization of |email_address|
Perform basic canonicalization of |email_address|, taking
into account that gmail does not consider '.' or caps inside a
username to matter. It also ignores everything after a '+'.
For example, ==, per
if not credential:
return None
parts = credential.split('@')
if len(parts) != 2:
raise error.TestError('Malformed email: ' + credential)
(name, domain) = parts
name = name.partition('+')[0]
if (domain == chromeos_constants.SPECIAL_CASE_DOMAIN):
name = name.replace('.', '')
return '@'.join([name, domain]).lower()
def user_path(user):
return utils.system_output('cryptohome-path user %s' % user)
def system_path(user):
return utils.system_output('cryptohome-path system %s' % user)
class CryptohomeProxy:
def __init__(self):
BUSNAME = 'org.chromium.Cryptohome'
PATH = '/org/chromium/Cryptohome'
INTERFACE = 'org.chromium.CryptohomeInterface'
bus = dbus.SystemBus()
obj = bus.get_object(BUSNAME, PATH)
self.iface = dbus.Interface(obj, INTERFACE)
def mount(self, user, password, create=False):
"""Mounts a cryptohome.
Returns True if the mount succeeds or False otherwise.
TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe
heuristic, then remove this method. See <>.
return self.iface.Mount(user, password, create, False, [])[1]
def unmount(self, user):
"""Unmounts a cryptohome.
Returns True if the unmount suceeds or false otherwise.
TODO(ellyjones): Once there's a per-user unmount method, use it. See
return self.iface.Unmount()
def is_mounted(self, user):
"""Tests whether a user's cryptohome is mounted."""
return (utils.is_mountpoint(user_path(user))
and utils.is_mountpoint(system_path(user)))
def require_mounted(self, user):
"""Raises a test failure if a user's cryptohome is not mounted."""