| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus, grp, os, pwd, stat |
| from dbus.mainloop.glib import DBusGMainLoop |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros import policy, session_manager |
| from autotest_lib.client.cros import cros_ui, cryptohome, ownership |
| |
| |
| class login_UserPolicyKeys(test.test): |
| """Verifies that, after user policy is pushed, the user policy key winds |
| up stored in the right place. |
| """ |
| version = 1 |
| |
| def _can_read(self, uid, gid, info): |
| """Returns true if uid or gid can read a file with the info stat.""" |
| if uid == info.st_uid: |
| return info.st_mode & stat.S_IRUSR |
| if gid == info.st_gid: |
| return info.st_mode & stat.S_IRGRP |
| return info.st_mode & stat.S_IROTH |
| |
| |
| def _can_execute(self, uid, gid, info): |
| """Returns true if uid or gid can execute a file with the info stat.""" |
| if uid == info.st_uid: |
| return info.st_mode & stat.S_IXUSR |
| if gid == info.st_gid: |
| return info.st_mode & stat.S_IXGRP |
| return info.st_mode & stat.S_IXOTH |
| |
| |
| def _verify_key_file(self, key_file): |
| """Verifies that the key file has been created and is readable.""" |
| if not os.path.isfile(key_file): |
| raise error.TestFail('%s does not exist!' % key_file) |
| # And is readable by chronos. |
| chronos_uid = pwd.getpwnam('chronos').pw_uid |
| chronos_gid = grp.getgrnam('chronos').gr_gid |
| info = os.stat(key_file) |
| if not stat.S_ISREG(info.st_mode): |
| raise error.TestFail('%s is not a regular file' % key_file) |
| if not self._can_read(chronos_uid, chronos_gid, info): |
| raise error.TestFail('chronos can\' read %s, mode is %s' % |
| (key_file, oct(info.st_mode))) |
| # All the parent directories must be executable by chronos. |
| current = key_file |
| parent = os.path.dirname(current) |
| while current != parent: |
| current = parent |
| parent = os.path.dirname(parent) |
| info = os.stat(current) |
| mode = stat.S_IMODE(info.st_mode) |
| if not self._can_execute(chronos_uid, chronos_gid, info): |
| raise error.TestFail('chronos can\'t execute %s, mode is %s' % |
| (current, oct(info.st_mode))) |
| |
| |
| def initialize(self): |
| super(login_UserPolicyKeys, self).initialize() |
| policy.install_protobufs(self.autodir, self.job) |
| self._bus_loop = DBusGMainLoop(set_as_default=True) |
| self._cryptohome_proxy = cryptohome.CryptohomeProxy( |
| self._bus_loop, self.autodir, self.job) |
| |
| # Clear the user's vault, to make sure the test starts without any |
| # policy or key lingering around. At this stage the session isn't |
| # started and there's no user signed in. |
| ownership.restart_ui_to_clear_ownership_files() |
| self._cryptohome_proxy.remove(ownership.TESTUSER) |
| |
| |
| def run_once(self): |
| # Mount the vault, connect to session_manager and start the session. |
| self._cryptohome_proxy.mount(ownership.TESTUSER, |
| ownership.TESTPASS, |
| create=True) |
| sm = session_manager.connect(self._bus_loop) |
| sm.StartSession(ownership.TESTUSER, '') |
| |
| # No policy stored yet. |
| retrieved_policy = sm.RetrievePolicyEx( |
| session_manager.make_user_policy_descriptor(ownership.TESTUSER), |
| byte_arrays=True) |
| if retrieved_policy: |
| raise error.TestError('session_manager already has user policy!') |
| |
| # And no user key exists. |
| key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER) |
| if os.path.exists(key_file): |
| raise error.TestFail('%s exists before storing user policy!' % |
| key_file) |
| |
| # Now store a policy. This is building a device policy protobuf, but |
| # that's fine as far as the session_manager is concerned; it's the |
| # outer PolicyFetchResponse that contains the public_key. |
| public_key = ownership.known_pubkey() |
| private_key = ownership.known_privkey() |
| policy_data = policy.build_policy_data() |
| policy_response = policy.generate_policy(private_key, |
| public_key, |
| policy_data) |
| try: |
| sm.StorePolicyEx( |
| session_manager.make_user_policy_descriptor(ownership.TESTUSER), |
| dbus.ByteArray(policy_response)) |
| except dbus.exceptions.DBusException as e: |
| raise error.TestFail('Failed to store user policy', e) |
| |
| # The policy key should have been created now. |
| self._verify_key_file(key_file) |
| |
| # Restart the ui; the key should be deleted. |
| self._cryptohome_proxy.unmount(ownership.TESTUSER) |
| cros_ui.restart() |
| if os.path.exists(key_file): |
| raise error.TestFail('%s exists after restarting ui!' % |
| key_file) |
| |
| # Starting a new session will restore the key that was previously |
| # stored. Reconnect to the session_manager, since the restart killed it. |
| self._cryptohome_proxy.mount(ownership.TESTUSER, |
| ownership.TESTPASS, |
| create=True) |
| sm = session_manager.connect(self._bus_loop) |
| sm.StartSession(ownership.TESTUSER, '') |
| self._verify_key_file(key_file) |
| |
| |
| def cleanup(self): |
| cros_ui.restart() |
| self._cryptohome_proxy.remove(ownership.TESTUSER) |
| super(login_UserPolicyKeys, self).cleanup() |