| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import copy |
| import json |
| |
| from autotest_lib.client.cros.enterprise.policy import Policy as Policy |
| from autotest_lib.client.cros.enterprise.device_policy_lookup import DEVICE_POLICY_DICT |
| |
| CHROMEPOLICIES = 'chromePolicies' |
| DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies' |
| EXTENSIONPOLICIES = 'extensionPolicies' |
| |
| |
| class AllPolicies(object): |
| |
| def __init__(self, isConfiguredPolicies=False): |
| self.extension_configured_data = {} |
| # Because Extensions have a different "displayed" value than the |
| # configured, the extension_displayed_values will be used when the |
| # policy group is "configured" to represent what the "displayed" policy |
| # values SHOULD be. |
| self.extension_displayed_values = {} |
| self.local = {} |
| self.chrome = {} |
| |
| self.policy_dict = {CHROMEPOLICIES: {}, |
| EXTENSIONPOLICIES: {}, |
| DEVICELOCALACCOUNT: {}} |
| self.isConfiguredPolicies = isConfiguredPolicies |
| if self.isConfiguredPolicies: |
| self.createNewFakeDMServerJson() |
| |
| def createNewFakeDMServerJson(self): |
| """Creates a fresh DM blob that will be used by the Fake DM server.""" |
| |
| self._DMJSON = { |
| 'managed_users': ['*'], |
| 'policy_user': None, |
| 'current_key_index': 0, |
| 'invalidation_source': 16, |
| 'invalidation_name': 'test_policy', |
| 'google/chromeos/user': {'mandatory': {}, 'recommended': {}}, |
| 'google/chromeos/device': {}, |
| 'google/chrome/extension': {} |
| } |
| self._DM_MANDATORY = self._DMJSON['google/chromeos/user']['mandatory'] |
| self._DM_RECOMMENDED = ( |
| self._DMJSON['google/chromeos/user']['recommended']) |
| self._DM_DEVICE = self._DMJSON['google/chromeos/device'] |
| self._DM_EXTENSION = self._DMJSON['google/chrome/extension'] |
| |
| def get_policy_as_dict(self, visual=False): |
| """Returns the policies as a dictionary.""" |
| self._update_policy_dict(visual) |
| return self.policy_dict |
| |
| def set_extension_policy(self, policies, visual=False): |
| """ |
| Sets the extension policies |
| |
| @param policies: Dict formatted as follows: |
| {'extID': {pol1: v1}, extid2: {p2:v2}} |
| |
| @param visual: bool, If the extension policy provided is what should be |
| displayed via the api/chrome page. If False, then the 'policies' |
| should be the actual value that will be provided to the DMServer. |
| |
| """ |
| # If the policy is configured (ie this policy group object represents) |
| # the policies being SET for testing) add the 'policy_group' value. |
| policy_group = 'extension' if self.isConfiguredPolicies else None |
| extension_policies = copy.deepcopy(policies) |
| |
| for extension_ID, extension_policy in extension_policies.items(): |
| |
| # Adding the extension ID key into the extension dict. |
| if visual: |
| self.extension_displayed_values[extension_ID] = {} |
| key = 'displayed_ext_values' |
| else: |
| self.extension_configured_data[extension_ID] = {} |
| key = 'ext_values' |
| self.set_policy(key, extension_policy, policy_group, extension_ID) |
| |
| def set_policy(self, |
| policy_type, |
| policies, |
| group=None, |
| extension_key=None): |
| """ |
| Create and the policy object, and set it in the corresponding group. |
| |
| @param policy_type: str of the policy type. Must be: |
| 'chrome', 'ext_values', 'displayed_ext_values', or 'local'. |
| @param policies: dict of policy values. |
| @param group: str, group key for the Policy object setter. |
| @param extension_key: optional, key for exentsionID. |
| |
| """ |
| policy_group = self._get_policy_group(policy_type, extension_key) |
| for name, value in policies.items(): |
| policy_group[name] = self._create_pol_obj(name, value, group) |
| |
| def _get_policy_group(self, policy_type, extension_key=None): |
| """Simple lookup to put the policies in the correct bucket.""" |
| if policy_type == 'chrome': |
| policy_group = self.chrome |
| elif policy_type == 'ext_values': |
| policy_group = self.extension_configured_data[extension_key] |
| elif policy_type == 'displayed_ext_values': |
| policy_group = self.extension_displayed_values[extension_key] |
| elif policy_type == 'local': |
| policy_group = self.local |
| return policy_group |
| |
| def updateDMJson(self): |
| """ |
| Update the ._DM_JSON with the values currently set in |
| self.chrome, self.extension_configured_data, and self.local. |
| |
| """ |
| |
| self._populateChromeData() |
| self._populateExtensionData() |
| |
| def _populateChromeData(self): |
| """Update the DM_JSON's chrome values.""" |
| for policy_name, policy_object in self.chrome.items(): |
| if policy_object.scope == 'machine': |
| dm_name = DEVICE_POLICY_DICT[policy_name] |
| self._DM_DEVICE.update( |
| self._clean_pol({dm_name: policy_object.value})) |
| |
| elif policy_object.level == 'recommended': |
| self._DM_RECOMMENDED.update( |
| self._clean_pol({policy_name: policy_object.value})) |
| |
| elif (policy_object.level == 'mandatory' and |
| policy_object.scope == 'user'): |
| self._DM_MANDATORY.update( |
| self._clean_pol({policy_name: policy_object.value})) |
| |
| def _populateExtensionData(self): |
| """Updates the DM_JSON's extension values.""" |
| for extension, ext_pol in self.extension_configured_data.items(): |
| extension_policies = {} |
| for polname, polItem in ext_pol.items(): |
| extension_policies[polname] = polItem.value |
| self._DM_EXTENSION.update({extension: extension_policies}) |
| |
| def _clean_pol(self, policies): |
| """Cleans the policies to be set on the fake DM server.""" |
| cleaned = {} |
| for policy, value in policies.items(): |
| if value is None: |
| continue |
| cleaned[policy] = self._jsonify(policy, value) |
| return cleaned |
| |
| def _jsonify(self, policy, value): |
| """Jsonify policy if its a dict or list that is not kiosk policy.""" |
| if isinstance(value, dict): |
| return json.dumps(value) |
| # Kiosk Policy, aka "account", is the only policy not formatted. |
| elif ( |
| isinstance(value, list) and |
| (policy != 'device_local_accounts.account')): |
| if value and isinstance(value[0], dict): |
| return json.dumps(value) |
| return value |
| |
| def _create_pol_obj(self, name, data, group=None): |
| """ |
| Create a policy object from Policy.Policy(). |
| |
| @param name: str, name of the policy |
| @param data: data value of the policy |
| @param group: optional, group of the policy. |
| |
| @returns: Policy object, reperesenting the policy args provided. |
| """ |
| policy_obj = Policy() |
| policy_obj.name = name |
| if policy_obj.is_formatted_value(data): |
| policy_obj.set_policy_from_dict(data) |
| else: |
| policy_obj.value = data |
| policy_obj.group = group |
| return policy_obj |
| |
| def _update_policy_dict(self, secondary_ext_policies): |
| """Update the local .policy_dict with the most current values.""" |
| for policy in self.chrome: |
| self.policy_dict[CHROMEPOLICIES].update( |
| self.chrome[policy].get_policy_as_dict()) |
| |
| ext_item = self._select_ext_group(secondary_ext_policies) |
| |
| for ext_name, ext_group in ext_item.items(): |
| ext_dict = {ext_name: {}} |
| for policy in ext_group: |
| pol_as_dict = ext_group[policy].get_policy_as_dict() |
| |
| ext_dict[ext_name].update(pol_as_dict) |
| self.policy_dict[EXTENSIONPOLICIES].update(ext_dict) |
| for policy in self.local: |
| self.policy_dict[DEVICELOCALACCOUNT].update( |
| self.local[policy].get_policy_as_dict()) |
| |
| def _select_ext_group(self, secondary_ext_policies): |
| """Determine which extension group to use for the configured dictionary |
| formatting. If the secondary_ext_policies flag has been set, and |
| the self.extension_displayed_values is not None, use |
| self.extension_displayed_values, |
| else: use the original configured |
| |
| @param secondary_ext_policies: bool |
| |
| """ |
| if secondary_ext_policies and self.extension_displayed_values: |
| return self.extension_displayed_values |
| else: |
| return self.extension_configured_data |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def __eq__(self, other): |
| """ |
| Override the == to check a policy group object vs another. |
| |
| Will return False if: |
| A policy is missing from self is missing in other, |
| when the policy is not None. |
| An Extension from self is missing in other. |
| If the policy valus in self are are not equal to the other |
| (less obfuscation). |
| |
| Else: True |
| """ |
| own_ext = self.extension_configured_data |
| if self.extension_displayed_values: |
| own_ext = self.extension_displayed_values |
| for ext_name, ext_group in own_ext.items(): |
| if ext_name not in other.extension_configured_data: |
| return False |
| if not self._check(own_ext[ext_name], |
| other.extension_configured_data[ext_name]): |
| return False |
| if ( |
| not self._check(self.chrome, other.chrome) or |
| not self._check(self.local, other.local)): |
| return False |
| return True |
| |
| def _check(self, policy_group, other_policy_group): |
| """ |
| Check if the policy_group is ==. |
| |
| Will return False if: |
| policy is missing from other policy object |
| policy objects != (per the Policy object __eq__ override) |
| Will return True if: |
| There is no policies |
| if the policy value is None |
| If no other conditions are violated |
| |
| """ |
| if not policy_group: # No object |
| return True |
| for policy_name, policy_group in policy_group.items(): |
| if policy_group.value is None: |
| return True |
| if policy_name not in other_policy_group: |
| return False |
| if policy_group != other_policy_group[policy_name]: |
| return False |
| return True |