blob: 9b8c83960fda0c531e76fc8c41fd104073f84397 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus, logging, os, tempfile
import common, constants, cros_ui, cryptohome
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import autotemp, error
class OwnershipError(error.TestError):
"""Generic error for ownership-related failures."""
pass
class scoped_tempfile(object):
"""A wrapper that provides scoped semantics for temporary files.
Providing a file path causes the scoped_tempfile to take ownership of the
file at the provided path. The file at the path will be deleted when this
object goes out of scope. If no path is provided, then a temporary file
object will be created for the lifetime of the scoped_tempfile
autotemp.tempfile objects don't seem to play nicely with being
used in system commands, so they can't be used for my purposes.
"""
tempdir = autotemp.tempdir(unique_id=__module__)
def __init__(self, name=None):
self.name = name
if not self.name:
self.fo = tempfile.TemporaryFile()
def __del__(self):
if self.name:
if os.path.exists(self.name):
os.unlink(self.name)
else:
self.fo.close() # Will destroy the underlying tempfile
def system_output_on_fail(cmd):
"""Run a |cmd|, capturing output and logging it only on error."""
output = None
try:
output = utils.system_output(cmd)
except:
logging.error(output)
raise
def __unlink(filename):
try:
os.unlink(filename)
except (IOError, OSError) as error:
logging.info(error)
def clear_ownership():
__unlink(constants.OWNER_KEY_FILE)
__unlink(constants.SIGNED_POLICY_FILE)
def connect_to_session_manager():
"""Create and return a DBus connection to session_manager.
Connects to the session manager over the DBus system bus. Returns
appropriately configured DBus interface object.
"""
bus = dbus.SystemBus()
proxy = bus.get_object('org.chromium.SessionManager',
'/org/chromium/SessionManager')
return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface')
def listen_to_session_manager_signal(callback, signal):
"""Create and return a DBus connection to session_manager.
Connects to the session manager over the DBus system bus. Returns
appropriately configured DBus interface object.
"""
bus = dbus.SystemBus()
bus.add_signal_receiver(
handler_function=callback,
signal_name=signal,
dbus_interface='org.chromium.Chromium',
bus_name=None,
path='/org/chromium/SessionManager')
POLICY_TYPE = 'google/chromeos/device'
def assert_has_policy_data(response_proto):
if not response_proto.HasField("policy_data"):
raise OwnershipError('Malformatted response.')
def assert_has_device_settings(data_proto):
if (not data_proto.HasField("policy_type") or
data_proto.policy_type != POLICY_TYPE or
not data_proto.HasField("policy_value")):
raise OwnershipError('Malformatted response.')
def assert_username(data_proto, username):
if data_proto.username != username:
raise OwnershipError('Incorrect username.')
def assert_guest_setting(settings, guests):
if not settings.HasField("guest_mode_enabled"):
raise OwnershipError('No guest mode setting protobuf.')
if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
raise OwnershipError('No guest mode setting.')
if settings.guest_mode_enabled.guest_mode_enabled != guests:
raise OwnershipError('Incorrect guest mode setting.')
def assert_show_users(settings, show_users):
if not settings.HasField("show_user_names"):
raise OwnershipError('No show users setting protobuf.')
if not settings.show_user_names.HasField("show_user_names"):
raise OwnershipError('No show users setting.')
if settings.show_user_names.show_user_names != show_users:
raise OwnershipError('Incorrect show users setting.')
def assert_roaming(settings, roaming):
if not settings.HasField("data_roaming_enabled"):
raise OwnershipError('No roaming setting protobuf.')
if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
raise OwnershipError('No roaming setting.')
if settings.data_roaming_enabled.data_roaming_enabled != roaming:
raise OwnershipError('Incorrect roaming setting.')
def assert_new_users(settings, new_users):
if not settings.HasField("allow_new_users"):
raise OwnershipError('No allow new users setting protobuf.')
if not settings.allow_new_users.HasField("allow_new_users"):
raise OwnershipError('No allow new users setting.')
if settings.allow_new_users.allow_new_users != new_users:
raise OwnershipError('Incorrect allow new users setting.')
def assert_users_on_whitelist(settings, users):
if settings.HasField("user_whitelist"):
for user in users:
if user not in settings.user_whitelist.user_whitelist:
raise OwnershipError(user + ' not whitelisted.')
else:
raise OwnershipError('No user whitelist.')
def assert_proxy_settings(settings, proxies):
if not settings.HasField("device_proxy_settings"):
raise OwnershipError('No proxy settings protobuf.')
if not settings.device_proxy_settings.HasField("proxy_mode"):
raise OwnershipError('No proxy_mode setting.')
if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']:
raise OwnershipError('Incorrect proxies: %s' % proxies)
NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb'
PK12UTIL = 'nsspk12util'
OPENSSLP12 = 'openssl pkcs12'
OPENSSLX509 = 'openssl x509'
OPENSSLRSA = 'openssl rsa'
OPENSSLREQ = 'openssl req'
OPENSSLCRYPTO = 'openssl sha1'
def use_known_ownerkeys():
"""Sets the system up to use a well-known keypair for owner operations.
Assuming the appropriate cryptohome is already mounted, configures the
device to accept policies signed with the checked-in 'mock' owner key.
"""
dirname = os.path.dirname(__file__)
mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
push_to_nss(mock_keyfile, mock_certfile, NSSDB)
utils.open_write_close(constants.OWNER_KEY_FILE,
cert_extract_pubkey_der(mock_certfile))
def known_privkey():
"""Returns the mock owner private key in PEM format.
"""
dirname = os.path.dirname(__file__)
return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
def known_pubkey():
"""Returns the mock owner public key in DER format.
"""
dirname = os.path.dirname(__file__)
return cert_extract_pubkey_der(os.path.join(dirname,
constants.MOCK_OWNER_CERT))
def pairgen():
"""Generate a self-signed cert and associated private key.
Generates a self-signed X509 certificate and the associated private key.
The key is 2048 bits. The generated material is stored in PEM format
and the paths to the two files are returned.
The caller is responsible for cleaning up these files.
"""
keyfile = scoped_tempfile.tempdir.name + '/private.key'
certfile = scoped_tempfile.tempdir.name + '/cert.pem'
cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
OPENSSLREQ, '/CN=me', keyfile, certfile)
system_output_on_fail(cmd)
return (keyfile, certfile)
def pairgen_as_data():
"""Generates keypair, returns keys as data.
Generates a fresh owner keypair and then passes back the
PEM-formatted private key and the DER-encoded public key.
"""
(keypath, certpath) = pairgen()
keyfile = scoped_tempfile(keypath)
certfile = scoped_tempfile(certpath)
return (utils.read_file(keyfile.name),
cert_extract_pubkey_der(certfile.name))
def push_to_nss(keyfile, certfile, nssdb):
"""Takes a pre-generated key pair and pushes them to an NSS DB.
Given paths to a private key and cert in PEM format, stores the pair
in the provided nssdb.
"""
for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
cmd = '%s -export -in %s -inkey %s -out %s ' % (
OPENSSLP12, certfile, keyfile, for_push.name)
cmd += '-passin pass: -passout pass:'
system_output_on_fail(cmd)
cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
nssdb,
for_push.name)
system_output_on_fail(cmd)
def generate_owner_creds():
"""Generates a keypair, registered with NSS, and returns key and cert.
Generates a fresh self-signed cert and private key. Registers them
with NSS and then passes back paths to files containing the
PEM-formatted private key and certificate.
"""
(keyfile, certfile) = pairgen()
push_to_nss(keyfile, certfile, NSSDB)
return (keyfile, certfile)
def cert_extract_pubkey_der(pem):
"""Given a PEM-formatted cert, extracts the public key in DER format.
Pass in an X509 certificate in PEM format, and you'll get back the
DER-formatted public key as a string.
"""
outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
outfile.name)
system_output_on_fail(cmd)
der = utils.read_file(outfile.name)
return der
def generate_and_register_keypair(testuser, testpass):
"""Generates keypair, registers with NSS, sets owner key, returns keypair.
Generates a fresh owner keypair. Registers keys with NSS,
puts the owner public key in the right place, ensures that the
session_manager picks it up, ensures the owner's home dir is
mounted, and then passes back the PEM-formatted private key and the
DER-encoded public key.
"""
(keypath, certpath) = generate_owner_creds()
keyfile = scoped_tempfile(keypath)
certfile = scoped_tempfile(certpath)
pubkey = cert_extract_pubkey_der(certfile.name)
utils.open_write_close(constants.OWNER_KEY_FILE, pubkey)
cros_ui.nuke()
cryptohome.mount_vault(testuser, testpass, create=False)
return (utils.read_file(keyfile.name), pubkey)
def sign(pem_key, data):
"""Signs |data| with key from |pem_key|, returns signature.
Using the PEM-formatted private key in |pem_key|, generates an
RSA-with-SHA1 signature over |data| and returns the signature in
a string.
"""
sig = scoped_tempfile()
err = scoped_tempfile()
data_file = scoped_tempfile()
data_file.fo.write(data)
data_file.fo.seek(0)
pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
utils.open_write_close(pem_key_file.name, pem_key)
cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
try:
utils.run(cmd,
stdin=data_file.fo,
stdout_tee=sig.fo,
stderr_tee=err.fo)
except:
err.fo.seek(0)
logging.error(err.fo.read())
raise
sig.fo.seek(0)
sig_data = sig.fo.read()
if not sig_data:
raise error.OwnershipError('Empty signature!')
return sig_data