blob: b47a21d17f8c87c602d7df98edf95402118dd03a [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import grp
import os
import pwd
import stat
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ownership_test
from autotest_lib.client.cros import cros_ui
from autotest_lib.client.cros import cryptohome
from autotest_lib.client.cros import ownership
class login_UserPolicyKeys(cros_ownership_test.OwnershipTest):
version = 1
def _can_read(self, uid, gid, info):
"""Returns true if uid or gid can read a file with the info stat."""
if uid == info.st_uid:
return info.st_mode & stat.S_IRUSR
if gid == info.st_gid:
return info.st_mode & stat.S_IRGRP
return info.st_mode & stat.S_IROTH
def _can_execute(self, uid, gid, info):
"""Returns true if uid or gid can execute a file with the info stat."""
if uid == info.st_uid:
return info.st_mode & stat.S_IXUSR
if gid == info.st_gid:
return info.st_mode & stat.S_IXGRP
return info.st_mode & stat.S_IXOTH
def _verify_key_file(self, key_file):
"""Verifies that the key file has been created and is readable."""
if not os.path.isfile(key_file):
raise error.TestFail('%s does not exist!' % key_file)
# And is readable by chronos.
chronos_uid = pwd.getpwnam('chronos').pw_uid
chronos_gid = grp.getgrnam('chronos').gr_gid
info = os.stat(key_file)
if not stat.S_ISREG(info.st_mode):
raise error.TestFail('%s is not a regular file' % key_file)
if not self._can_read(chronos_uid, chronos_gid, info):
raise error.TestFail('chronos can\' read %s, mode is %s' %
(key_file, oct(info.st_mode)))
# All the parent directories must be executable by chronos.
current = key_file
parent = os.path.dirname(current)
while current != parent:
current = parent
parent = os.path.dirname(parent)
info = os.stat(current)
mode = stat.S_IMODE(info.st_mode)
if not self._can_execute(chronos_uid, chronos_gid, info):
raise error.TestFail('chronos can\'t execute %s, mode is %s' %
(current, oct(info.st_mode)))
def setup(self):
os.chdir(self.srcdir)
utils.make('OUT_DIR=.')
def initialize(self):
super(login_UserPolicyKeys, self).initialize()
# Clear the user's vault, to make sure the test starts without any
# policy or key lingering around. At this stage the session isn't
# started and there's no user signed in.
cryptohome.remove_vault(self._testuser)
def run_once(self):
# Mount the vault, connect to session_manager and start the session.
cryptohome.mount_vault(self._testuser, self._testpass, create=True)
session_manager = self.connect_to_session_manager()
if not session_manager.StartSession(self._testuser, ''):
raise error.TestError('Could not start session')
# No policy stored yet.
retrieved_policy = session_manager.RetrieveUserPolicy(byte_arrays=True)
if retrieved_policy:
raise error.TestError('session_manager already has user policy!')
# And no user key exists.
key_file = ownership.get_user_policy_key_filename(self._testuser)
if os.path.exists(key_file):
raise error.TestFail('%s exists before storing user policy!' %
key_file)
# Now store a policy. This is building a device policy protobuf, but
# that's fine as far as the session_manager is concerned; it's the
# outer PolicyFetchResponse that contains the public_key.
public_key = ownership.known_pubkey()
private_key = ownership.known_privkey()
policy_data = self.build_policy_data()
policy_response = self.generate_policy(private_key,
public_key,
policy_data)
try:
result = session_manager.StoreUserPolicy(
dbus.ByteArray(policy_response))
if not result:
raise error.TestFail('Failed to store user policy')
except dbus.exceptions.DBusException, e:
raise error.TestFail('Failed to store user policy', e)
# The policy key should have been created now.
self._verify_key_file(key_file)
# Restart the ui; the key should be deleted.
cryptohome.unmount_vault()
cros_ui.restart()
if os.path.exists(key_file):
raise error.TestFail('%s exists after restarting ui!' %
key_file)
# Starting a new session will restore the key that was previously
# stored. Reconnect to the session_manager, since the restart killed it.
cryptohome.mount_vault(self._testuser, self._testpass, create=True)
session_manager = self.connect_to_session_manager()
if not session_manager.StartSession(self._testuser, ''):
raise error.TestError('Could not start session after restart')
self._verify_key_file(key_file)
def cleanup(self):
cryptohome.unmount_vault()
super(login_UserPolicyKeys, self).cleanup()