blob: 8adf140dc5e1324651a933603477db9865a17af7 [file] [log] [blame]
#!/usr/bin/python2.7
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This implements a simple tool to create policy blobs signed with a given
key. It can create both device and user policies. The output will consist of
two files a policy file and the owner.key file which contains the policy
signature.
The input file is JSON. The root dictionary contains a list under the
key "managed_users". Keys in the root dictionary identify request scopes.
The user-request scope is described by a dictionary that holds two
sub-dictionaries: "mandatory" and "recommended". Both these hold the policy
definitions as key/value stores, their format is identical to what the Linux
implementation reads from /etc.
The device-scope holds the policy-definition directly as key/value stores
in the protobuf-format.
Example:
{
"google/chromeos/device" : {
"guest_mode_enabled" : false
},
"google/chromeos/user" : {
"mandatory" : {
"HomepageLocation" : "http://www.chromium.org",
"IncognitoEnabled" : false
},
"recommended" : {
"JavascriptEnabled": false
}
}
}
"""
import optparse
import os
import re
import sys
import time
import tlslite
import tlslite.api
import tlslite.utils
# The name and availability of the json module varies in python versions.
try:
import simplejson as json
except ImportError:
try:
import json
except ImportError:
json = None
import asn1der
import device_management_backend_pb2 as dm
import cloud_policy_pb2 as cp
import chrome_device_policy_pb2 as dp
# ASN.1 object identifier for PKCS#1/RSA.
PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
def SetProtobufMessageField(group_message, field, field_value):
'''Sets a field in a protobuf message.
Args:
group_message: The protobuf message.
field: The field of the message to set, it shuold be a member of
group_message.DESCRIPTOR.fields.
field_value: The value to set.
'''
if field.label == field.LABEL_REPEATED:
assert type(field_value) == list
entries = group_message.__getattribute__(field.name)
for list_item in field_value:
entries.append(list_item)
return
elif field.type == field.TYPE_BOOL:
assert type(field_value) == bool
elif field.type == field.TYPE_STRING:
assert type(field_value) == str or type(field_value) == unicode
elif field.type == field.TYPE_INT64:
assert type(field_value) == int
elif (field.type == field.TYPE_MESSAGE and
field.message_type.name == 'StringList'):
assert type(field_value) == list
entries = group_message.__getattribute__(field.name).entries
for list_item in field_value:
entries.append(list_item)
return
else:
raise Exception('Unknown field type %s' % field.type)
group_message.__setattr__(field.name, field_value)
def GatherDevicePolicySettings(settings, policies):
'''Copies all the policies from a dictionary into a protobuf of type
CloudDeviceSettingsProto.
Args:
settings: The destination ChromeDeviceSettingsProto protobuf.
policies: The source dictionary containing policies in JSON format.
'''
for group in settings.DESCRIPTOR.fields:
# Create protobuf message for group.
group_message = eval('dp.' + group.message_type.name + '()')
# Indicates if at least one field was set in |group_message|.
got_fields = False
# Iterate over fields of the message and feed them from the
# policy config file.
for field in group_message.DESCRIPTOR.fields:
field_value = None
if field.name in policies:
got_fields = True
field_value = policies[field.name]
SetProtobufMessageField(group_message, field, field_value)
if got_fields:
settings.__getattribute__(group.name).CopyFrom(group_message)
def GatherUserPolicySettings(settings, policies):
'''Copies all the policies from a dictionary into a protobuf of type
CloudPolicySettings.
Args:
settings: The destination: a CloudPolicySettings protobuf.
policies: The source: a dictionary containing policies under keys
'recommended' and 'mandatory'.
'''
for group in settings.DESCRIPTOR.fields:
# Create protobuf message for group.
group_message = eval('cp.' + group.message_type.name + '()')
# We assume that this policy group will be recommended, and only switch
# it to mandatory if at least one of its members is mandatory.
group_message.policy_options.mode = cp.PolicyOptions.RECOMMENDED
# Indicates if at least one field was set in |group_message|.
got_fields = False
# Iterate over fields of the message and feed them from the
# policy config file.
for field in group_message.DESCRIPTOR.fields:
field_value = None
if field.name in policies['mandatory']:
group_message.policy_options.mode = cp.PolicyOptions.MANDATORY
field_value = policies['mandatory'][field.name]
elif field.name in policies['recommended']:
field_value = policies['recommended'][field.name]
if field_value != None:
got_fields = True
SetProtobufMessageField(group_message, field, field_value)
if got_fields:
settings.__getattribute__(group.name).CopyFrom(group_message)
def ProcessCloudPolicy(policy_type,
policy_def, policy_key,
username,
output_path):
"""Creates a policy blob.
Encodes the policy into protobuf representation, signs it and saves it.
Args:
policy_type: can be 'google/chromeos/user' or 'google/chromeos/device'.
policy_def: The JSON file containing the policy definition.
policy_key: A private key to be used to sign the blob.
username: Username to be integrated in the policy blob.
output_path: A directory where to put the output files.
"""
policy = json.loads(open(policy_def).read())
policy_value = ''
if (policy_type in policy):
if policy_type == 'google/chromeos/user':
settings = cp.CloudPolicySettings()
GatherUserPolicySettings(settings, policy[policy_type])
policy_value = settings.SerializeToString()
elif policy_type == 'google/chromeos/device':
settings = dp.ChromeDeviceSettingsProto()
GatherDevicePolicySettings(settings, policy[policy_type])
policy_value = settings.SerializeToString()
key = tlslite.api.parsePEMKey(open(policy_key).read(), private=True)
algorithm = asn1der.Sequence(
[ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID),
asn1der.Data(asn1der.NULL, '') ])
rsa_pubkey = asn1der.Sequence([ asn1der.Integer(key.n),
asn1der.Integer(key.e) ])
pubkey = asn1der.Sequence([ algorithm, asn1der.Bitstring(rsa_pubkey) ])
key_version = 1
# Fill the policy data protobuf.
policy_data = dm.PolicyData()
policy_data.policy_type = policy_type
policy_data.timestamp = int(time.time() * 1000)
policy_data.request_token = "DEV_TOKEN"
policy_data.policy_value = policy_value
policy_data.machine_name = "MEAN_MACHINE"
policy_data.public_key_version = 1
policy_data.username = username
policy_data.device_id = "1337_1D"
signed_data = policy_data.SerializeToString()
response = dm.DeviceManagementResponse()
fetch_response = response.policy_response.responses.add()
fetch_response.policy_data = signed_data
fetch_response.policy_data_signature = bytes(
key.hashAndSign(signed_data))
fetch_response.new_public_key = pubkey
open("%s/policy" % output_path,"wb").
write(fetch_response.SerializeToString());
open("%s/owner.key" % output_path,"wb").write(pubkey);
def main(options):
ProcessCloudPolicy(options.policy_type,
options.policy_def, options.policy_key,
options.policy_user,
options.output_path);
if __name__ == '__main__':
option_parser = optparse.OptionParser()
option_parser.add_option('-k', '--policy-key', default="mykey",
dest='policy_key',
help='Specify a path to a PEM-encoded private key '
'to use for policy signing.')
option_parser.add_option('-p', '--policy-def', default="device_management",
dest='policy_def',
help='Specify a path to a PEM-encoded private key '
'to use for policy signing.')
option_parser.add_option('-u', '--policy-user', default='user@example.com',
dest='policy_user',
help='Specify the user name the server should '
'report back to the client as the user owning the '
'token used for making the policy request.')
option_parser.add_option('-o', '--output-path', default='.',
dest='output_path',
help='Specifies the directory to output policy '
'files to.')
option_parser.add_option('-t', '--type', default='google/chromeos/device',
dest='policy_type',
help='Specifies the type of policy to create.')
options, args = option_parser.parse_args()
sys.exit(main(options))