blob: 6c1701241e6972b0996bfd90ef9d7afbf3a8c57f [file] [log] [blame]
import json
import logging
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.enterprise import enterprise_policy_utils
from autotest_lib.client.cros.enterprise.policy_group import AllPolicies
CHROMEPOLICIES = 'chromePolicies'
DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies'
EXTENSIONPOLICIES = 'extensionPolicies'
CHROMEOS_CLOUDDPC = {
'AudioCaptureAllowed': 'unmuteMicrophoneDisabled',
'DefaultGeolocationSetting': 'shareLocationDisabled',
'DeviceBlockDevmode': 'debuggingFeaturesDisabled',
'DisableScreenshots': 'screenCaptureDisabled',
'ExternalStorageDisabled': 'usbFileTransferDisabled',
'VideoCaptureAllowed': 'cameraDisabled',
}
class Policy_Manager(object):
def __init__(self, username=None, fake_dm_server=None):
"""
This class is to hanlde:
Setting policies to a Fake DM server
Obtaining policies from a DUT
Obtaining clouddpc policy settings.
Obtinaing both set/received policies in many different data formats
Verifying policies provided to the DUT are correct.
It has been designed so all the features are independent, meaning a
fake_dm_server is not required to obtain policies from the DUT, get
clouddpc values, etc.
@params username: username to be used when creating the fake DM policy.
@params fake_dm_server: The fake DM server object.
"""
self._configured = AllPolicies(True)
self._obtained = AllPolicies()
self.CHROME = 'chrome'
self.LOCAL = 'local'
self.EXT = 'ext'
self.username = username
self.fake_dm_server = fake_dm_server
self.autotest_ext = None
# If a fake_dm_sever is provided, enabled auto_update by default.
# Can be turned off if desired.
self._auto_updateDM = bool(fake_dm_server)
def configure_policies(self,
user={},
suggested_user={},
device={},
extension={},
new=True):
"""
Used to configure desired policies on the DUT. Will also save the
configured policies.
If a fake_dm_server was provided on class initialization, and
auto_updateDM is not False, this will also update the fake_dm_server.
@params user/suggested_user/device/extension: dict of the policies to
be set. extension must be provided with the extension id. e.g.
{'ExtensionID': {policy_dict}}
@param new: bool, if True clear all previously configured policies.
"""
if new:
self._configured = AllPolicies(True)
self._configured.set_policy('chrome', user, 'user')
self._configured.set_policy('chrome', suggested_user, 'suggested_user')
self._configured.set_policy('chrome', device, 'device')
self._configured.set_extension_policy(extension)
if self.auto_updateDM:
self.updateDMServer()
def configure_extension_visual_policy(self,
ext_policy={},
new=True):
"""
Extensions are... different. The policy set for them often is not the
actual policy, but a pointer to where the policy data is. This makes
verifying the extension policy tricky.
To help with, this function will allow you to set the 'real' policy.
Important things to note: This is only useful for verifying the policy,
and getting the policy as a dictionary (which has a flag for which
style of extension policies you want to see. The DM server will not be
set via this.
@param ext_policy: dict, Extension policy must be provided with the
extension id:
{'ExtensionID': {policy_dict}}.
@param new: bool, if True will erase any previously stored VISUAL
extension data.
"""
if new:
self._configured.ext_values = {}
self._configured.set_extension_policy(ext_policy, True)
def remove_policy(self,
policy_name,
policy_type,
extID=None):
"""
Removes the policy from the configured policies. Useful when you want
clear a specific policy, but leave the other policies untouched.
If auto_updateDM is True (thus a fake_dm_server has been provided), the
dm server will be updated.
@param policy_name: The policy name
@param policy_type: The type of policy it is. Valid types:
"user", "device", "extension", "suggested_user".
@param extID: The extension ID, if removing an extension policy.
"""
if policy_type != 'extension':
self._removeChromePolicy(policy_name)
else:
self._removeExtensionPolicy(policy_name, extID)
if self.auto_updateDM:
self.updateDMServer()
def _removeChromePolicy(self, policy_name):
"""
Attempts to remove the specified extension policy from specified
extension.
@rasies error.TestError: If the policy is not in the configured
policies.
"""
try:
del self._configured.chrome[policy_name]
except KeyError:
raise error.TestError('Policy {} missing from chrome policies.'
.format(policy_name))
def _removeExtensionPolicy(self, policy_name, extID):
"""
Attempts to remove the specified extension policy from specified
extension.
@raises error.TestError: if the policy_type is an 'extension', but the
extID is not provided, or the policy is not found in the extension.
"""
if not extID:
raise error.TestError(
'Cannot delete extension policy without extension ID')
try:
del self._configured.extension_configured_data[extID][policy_name]
except KeyError:
raise error.TestError(
'Policy {} missing from extension policies.'
.format(policy_name))
def obtain_policies_from_device(self, autotest_ext=None):
"""
Calls the autotest private getAllEnterprisePolicies() API, and saves
the response.
@param autotest_ext: The autotest browser extension.
"""
if autotest_ext:
self.autotest_ext = autotest_ext
if not self.autotest_ext:
raise error.TestError('Cannot obtain policies without autotest_ext')
self.raw_data = enterprise_policy_utils.get_all_policies(
self.autotest_ext)
self._obtained.set_policy(self.CHROME, self.raw_data[CHROMEPOLICIES])
self._obtained.set_policy(self.LOCAL, self.raw_data[DEVICELOCALACCOUNT])
self._obtained.set_extension_policy(self.raw_data[EXTENSIONPOLICIES])
def verify_policy(self, policyName, policy_value, extID=None):
"""Verifies the configured policies are == to the policies obtained."""
recieved_value = self.get_policy_value_from_DUT(policyName=policyName,
extID=extID,
refresh=True)
if not recieved_value == policy_value:
raise error.TestError(
'Policy {} value was not set correctly. \nExpected:\t {}'
'\nReceived: \t'.format(policyName,
policy_value,
recieved_value))
logging.info('Policy verification successful')
def verify_policies(self):
"""Verifies the configured policies are == to the policies obtained."""
if not self._configured == self._obtained:
raise error.TestError(
'Configured policies did not match policies received from DUT.')
logging.info('Policy verification successful')
def get_policy_value_from_DUT(self, policyName, extID=None, refresh=False):
"""
Get the value of a specified policy from the DUT. If the policy is from
an extension, an extension ID (extID) must be provided.
@param policyName: str, the name of the policy.
@param extID: The ID of the extension.
@param refresh: bool, if you want to get the policies from the DUT again
Note: This does NOT force the DUT to re-obtain the policies from the
DM server.
@returns: The value of the policy if found, else None.
"""
if refresh:
# Uses the previously provided autotest Extension.
self.obtain_policies_from_device()
if extID:
if policyName in self._obtained.extension_configured_data[extID]:
return (
self._obtained.extension_configured_data[extID][policyName]
.value)
elif policyName in self._obtained.chrome:
return self._obtained.chrome[policyName].value
return None
def updateDMServer(self):
"""Updates the Fake DM server with the current configured policy."""
fake_dm_json = self.getDMConfig()
logging.info('Policy blob {}'.format(fake_dm_json))
if not self.fake_dm_server:
raise error.TestError(
'Cannot update DM server. DM server not provided')
self.fake_dm_server.setup_policy(fake_dm_json)
def getDMConfig(self, refresh=True):
""""
Creates the DM configuration (aka Json blob) to be used by the
fake DM server
@param refresh: bool, if True, will clear any previous configuration.
If False, return the currently configured DM blob.
"""
if refresh:
self._configured.createNewFakeDMServerJson()
self._configured.updateDMJson()
self._configured._DMJSON['policy_user'] = self.username
return json.dumps(self._configured._DMJSON)
def getCloudDpc(self):
"""Gets the Cloud DPC ARC policy settings."""
expected_cloud_dpc_settings = {}
if self._arc_certs():
self._add_arc_certs(expected_cloud_dpc_settings)
self._add_shared_policies(expected_cloud_dpc_settings)
self._add_shared_arc_policy(expected_cloud_dpc_settings)
return expected_cloud_dpc_settings
def _arc_certs(self):
"""
Returns True if ArcCertificatesSyncMode is set in the configured
policies and the bool(value) is True, else False.
"""
if ('ArcCertificatesSyncMode' in self._configured.chrome and
self._configured.chrome['ArcCertificatesSyncMode'].value):
return True
return False
def _add_shared_arc_policy(self, dpc):
"""Adds the shared policies that are subset within the 'ArcPolicy'."""
Arc_Policy = self._configured.chrome.get('ArcPolicy', {})
if Arc_Policy:
Arc_Policy = Arc_Policy.value
for key in ['applications', 'accountTypesWithManagementDisabled']:
if key in Arc_Policy:
dpc[key] = Arc_Policy[key]
def _add_shared_policies(self, dpc):
"""
Add all of the configured policies that are shared with arc clouddpc,
to the "dpc" dict, with the clouddpc key. If the policy is not set, it
will not be added.
"""
for policy_name, dpc_name in CHROMEOS_CLOUDDPC.items():
if policy_name in self._configured.chrome:
dpc[dpc_name] = self._configured.chrome[policy_name].value
def _add_arc_certs(self, dpc):
open_network_config = 'OpenNetworkConfiguration'
if open_network_config in self._configured.chrome:
dpc['caCerts'] = self._configured.chrome[open_network_config].value
else:
dpc['caCerts'] = None
def get_configured_policies_as_dict(self, visual=False):
""" Returns the configured policies as a dict."""
return self._configured.get_policy_as_dict(visual)
def get_obtained_policies_as_dict(self):
""" Returns the obtained policies as a dict."""
return self._obtained.get_policy_as_dict(visual=True)
@property
def auto_updateDM(self):
""" Returns the current state of the auto_updateDM setting."""
return self._auto_updateDM
@auto_updateDM.setter
def auto_updateDM(self, value):
"""Turns on/off auto updating of the DM server."""
if not isinstance(value, bool):
raise error.TestError('Auto Update DM must be bool, got {}'
.format(value))
if value and not self.fake_dm_server:
raise error.TestError(
'Cannot autoupdate without the Fake DM server configured.')
self._auto_updateDM = value