blob: aa1dff3cc981a09043524e17f8c1e831a00014a9 [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Transforms and validates cros config from source YAML to target JSON"""
from __future__ import print_function
import argparse
import collections
import copy
import json
from jsonschema import validate
import os
import re
import sys
import yaml
this_dir = os.path.dirname(__file__)
CHROMEOS = 'chromeos'
CONFIGS = 'configs'
DEVICES = 'devices'
PRODUCTS = 'products'
SKUS = 'skus'
CONFIG = 'config'
BUILD_ONLY_ELEMENTS = [
'/firmware', '/firmware-signing', '/audio/main/files', '/touch/files',
'/arc/files', '/thermal/files'
]
BRAND_ELEMENTS = ['brand-code', 'firmware-signing', 'wallpaper']
TEMPLATE_PATTERN = re.compile('{{([^}]*)}}')
def GetNamedTuple(mapping):
"""Converts a mapping into Named Tuple recursively.
Args:
mapping: A mapping object to be converted.
Returns:
A named tuple generated from mapping
"""
if not isinstance(mapping, collections.Mapping):
return mapping
new_mapping = {}
for k, v in mapping.iteritems():
if type(v) is list:
new_list = []
for val in v:
new_list.append(GetNamedTuple(val))
new_mapping[k.replace('-', '_').replace('@', '_')] = new_list
else:
new_mapping[k.replace('-', '_').replace('@', '_')] = GetNamedTuple(v)
return collections.namedtuple('Config', new_mapping.iterkeys())(**new_mapping)
def MergeDictionaries(primary, overlay):
"""Merges the overlay dictionary onto the primary dictionary.
If an element doesn't exist, it's added.
If the element is a list, they are appended to each other.
Otherwise, the overlay value takes precedent.
Args:
primary: Primary dictionary
overlay: Overlay dictionary
"""
for overlay_key in overlay.keys():
overlay_value = overlay[overlay_key]
if not overlay_key in primary:
primary[overlay_key] = overlay_value
elif isinstance(overlay_value, collections.Mapping):
MergeDictionaries(primary[overlay_key], overlay_value)
elif type(overlay_value) is list:
primary[overlay_key].extend(overlay_value)
else:
primary[overlay_key] = overlay_value
def ParseArgs(argv):
"""Parse the available arguments.
Invalid arguments or -h cause this function to print a message and exit.
Args:
argv: List of string arguments (excluding program name / argv[0])
Returns:
argparse.Namespace object containing the attributes.
"""
parser = argparse.ArgumentParser(
description='Validates a YAML cros-config and transforms it to JSON')
parser.add_argument(
'-s',
'--schema',
type=str,
help='Path to the schema file used to validate the config')
parser.add_argument(
'-c',
'--config',
type=str,
help='Path to the YAML config file that will be validated/transformed')
parser.add_argument(
'-m',
'--configs',
nargs='+',
type=str,
help='Path to the YAML config file(s) that will be validated/transformed')
parser.add_argument(
'-o',
'--output',
type=str,
help='Output file that will be generated by the transform (system file)')
parser.add_argument(
'-g',
'--generated_c_output',
type=str,
help='Output file that will contain generated c bindings of the config.')
parser.add_argument(
'-f',
'--filter',
type=bool,
default=False,
help='Filter build specific elements from the output JSON')
return parser.parse_args(argv)
def _SetTemplateVars(template_input, template_vars):
"""Builds a map of template variables by walking the input recursively.
Args:
template_input: A mapping object to be walked.
template_vars: A mapping object built up while walking the template_input.
"""
to_walk = []
for key, val in template_input.iteritems():
if isinstance(val, collections.Mapping):
to_walk.append(val)
elif type(val) is not list:
template_vars[key] = val
# Do this last so all variables from the parent are in scope first.
for val in to_walk:
_SetTemplateVars(val, template_vars)
def _GetVarTemplateValue(val, template_input, template_vars):
"""Applies the templating scheme to a single value.
Args:
val: The single val to evaluate.
template_input: Input that will be updated based on the templating schema.
template_vars: A mapping of all the variables values available.
Returns:
The variable value with templating applied.
"""
for template_var in TEMPLATE_PATTERN.findall(val):
replace_string = '{{%s}}' % template_var
if template_var not in template_vars:
formatted_vars = json.dumps(template_vars, sort_keys=True, indent=2)
formatted_input = json.dumps(template_input, sort_keys=True, indent=2)
error_vals = (template_var, val, formatted_input, formatted_vars)
raise ValidationError("Referenced template variable '%s' doesn't "
"exist string '%s'.\nInput:\n %s\nVariables:\n%s" %
error_vals)
var_value = template_vars[template_var]
# This is an ugly side effect of templating with primitive values.
# The template is a string, but the target value needs to be int.
# This is sort of a hack for now, but if the problem gets worse, we
# can come up with a more scaleable solution.
#
# Guessing this problem won't continue though beyond the use of 'sku-id'
# since that tends to be the only strongly typed value due to its use
# for identity detection.
is_int = isinstance(var_value, int)
if is_int:
var_value = str(var_value)
# If the caller only had one value and it was a template variable that
# was an int, assume the caller wanted the string to be an int.
if is_int and val == replace_string:
val = template_vars[template_var]
else:
val = val.replace(replace_string, var_value)
return val
def _ApplyTemplateVars(template_input, template_vars):
"""Evals the input and applies the templating schema using the provided vars.
Args:
template_input: Input that will be updated based on the templating schema.
template_vars: A mapping of all the variables values available.
"""
maps = []
lists = []
for key in template_input.keys():
val = template_input[key]
if isinstance(val, collections.Mapping):
maps.append(val)
elif isinstance(val, list):
index = 0
for list_val in val:
if isinstance(list_val, collections.Mapping):
lists.append(list_val)
elif isinstance(list_val, basestring):
val[index] = _GetVarTemplateValue(list_val, template_input,
template_vars)
index += 1
elif isinstance(val, basestring):
template_input[key] = _GetVarTemplateValue(val, template_input,
template_vars)
# Do this last so all variables from the parent are in scope first.
for value in maps:
_ApplyTemplateVars(value, template_vars)
# Object lists need their variables put in scope on a per list item basis
for value in lists:
list_item_vars = copy.deepcopy(template_vars)
_SetTemplateVars(value, list_item_vars)
while _HasTemplateVariables(list_item_vars):
_ApplyTemplateVars(list_item_vars, list_item_vars)
_ApplyTemplateVars(value, list_item_vars)
def _DeleteTemplateOnlyVars(template_input):
"""Deletes all variables starting with $
Args:
template_input: Input that will be updated based on the templating schema.
"""
to_delete = []
for key in template_input.keys():
val = template_input[key]
if isinstance(val, collections.Mapping):
_DeleteTemplateOnlyVars(val)
elif type(val) is list:
for v in val:
if isinstance(v, collections.Mapping):
_DeleteTemplateOnlyVars(v)
elif key.startswith('$'):
to_delete.append(key)
for key in to_delete:
del template_input[key]
def _HasTemplateVariables(template_vars):
"""Checks if there are any unevaluated template variables.
Args:
template_vars: A mapping of all the variables values available.
Returns:
True if they are still unevaluated template variables.
"""
for val in template_vars.values():
if isinstance(val, basestring) and len(TEMPLATE_PATTERN.findall(val)) > 0:
return True
def TransformConfig(config):
"""Transforms the source config (YAML) to the target system format (JSON)
Applies consistent transforms to covert a source YAML configuration into
JSON output that will be used on the system by cros_config.
Args:
config: Config that will be transformed.
Returns:
Resulting JSON output from the transform.
"""
config_yaml = yaml.load(config)
json_from_yaml = json.dumps(config_yaml, sort_keys=True, indent=2)
json_config = json.loads(json_from_yaml)
configs = []
if DEVICES in json_config[CHROMEOS]:
for device in json_config[CHROMEOS][DEVICES]:
template_vars = {}
for product in device.get(PRODUCTS, [{}]):
for sku in device[SKUS]:
# Template variables scope is config, then device, then product
# This allows shared configs to define defaults using anchors, which
# can then be easily overridden by the product/device scope.
_SetTemplateVars(sku, template_vars)
_SetTemplateVars(device, template_vars)
_SetTemplateVars(product, template_vars)
while _HasTemplateVariables(template_vars):
_ApplyTemplateVars(template_vars, template_vars)
sku_clone = copy.deepcopy(sku)
_ApplyTemplateVars(sku_clone, template_vars)
config = sku_clone[CONFIG]
_DeleteTemplateOnlyVars(config)
configs.append(config)
else:
configs = json_config[CHROMEOS][CONFIGS]
# Drop everything except for configs since they were just used as shared
# config in the source yaml.
json_config = {CHROMEOS: {CONFIGS: configs}}
return _FormatJson(json_config)
def GenerateCBindings(config):
"""Generates C struct bindings
Generates C struct bindings that can be used by mosys.
Args:
config: Config (transformed) that is the transform basis.
"""
struct_format_x86 = '''
{.platform_name = "%s",
.smbios_match_name = "%s",
.sku_id = %s,
.customization_id = "%s",
.whitelabel_tag = "%s",
.info = {.brand = "%s",
.model = "%s",
.customization = "%s",
.signature_id = "%s"}}'''
struct_format_arm = '''
{.platform_name = "%s",
.device_tree_compatible_match = "%s",
.customization_id = "%s",
.whitelabel_tag = "%s",
.info = {.brand = "%s",
.model = "%s",
.customization = "%s",
.signature_id = "%s"}}'''
structs = []
json_config = json.loads(config)
for config in json_config[CHROMEOS][CONFIGS]:
identity = config['identity']
name = config['name']
whitelabel_tag = identity.get('whitelabel-tag', '')
customization_id = identity.get('customization-id', '')
customization = customization_id or whitelabel_tag or name
signature_id = config.get('firmware-signing', {}).get('signature-id', '')
signature_id = signature_id or name
brand_code = config.get('brand-code', '')
platform_name = identity.get('platform-name', '')
device_tree_compatible_match = identity.get(
'device-tree-compatible-match', '')
if device_tree_compatible_match:
structs.append(
struct_format_arm % (platform_name,
device_tree_compatible_match,
customization_id,
whitelabel_tag,
brand_code,
name,
customization,
signature_id))
else:
structs.append(
struct_format_x86 % (platform_name,
identity.get('smbios-name-match', ''),
identity.get('sku-id', -1),
customization_id,
whitelabel_tag,
brand_code,
name,
customization,
signature_id))
file_format = '''\
#include "lib/cros_config_struct.h"
static struct config_map all_configs[] = {%s
};
const struct config_map *cros_config_get_config_map(int *num_entries) {
*num_entries = %s;
return &all_configs[0];
}
'''
return file_format % (',\n'.join(structs), len(structs))
def _FormatJson(config):
"""Formats JSON for output or printing.
Args:
config: Dictionary to be output
"""
# Work around bug in json dumps that adds trailing spaces with indent set.
return re.sub(
', $',
',',
json.dumps(config, sort_keys=True, indent=2), flags=re.MULTILINE)
def FilterBuildElements(config):
"""Removes build only elements from the schema.
Removes build only elements from the schema in preparation for the platform.
Args:
config: Config (transformed) that will be filtered
"""
json_config = json.loads(config)
for config in json_config[CHROMEOS][CONFIGS]:
_FilterBuildElements(config, '')
return _FormatJson(json_config)
def _FilterBuildElements(config, path):
"""Recursively checks and removes build only elements.
Args:
config: Dict that will be checked.
path: Path of elements to filter.
"""
to_delete = []
for key in config:
full_path = '%s/%s' % (path, key)
if full_path in BUILD_ONLY_ELEMENTS:
to_delete.append(key)
elif isinstance(config[key], dict):
_FilterBuildElements(config[key], full_path)
for key in to_delete:
config.pop(key)
def ValidateConfigSchema(schema, config):
"""Validates a transformed cros config against the schema specified
Verifies that the config complies with the schema supplied.
Args:
schema: Source schema used to verify the config.
config: Config (transformed) that will be verified.
"""
json_config = json.loads(config)
schema_yaml = yaml.load(schema)
schema_json_from_yaml = json.dumps(schema_yaml, sort_keys=True, indent=2)
schema_json = json.loads(schema_json_from_yaml)
validate(json_config, schema_json)
class ValidationError(Exception):
"""Exception raised for a validation error"""
pass
def _ValidateUniqueIdentities(json_config):
"""Verifies the identity tuple is globally unique within the config.
Args:
json_config: JSON config dictionary
"""
identities = [str(config['identity'])
for config in json_config['chromeos']['configs']]
if len(identities) != len(set(identities)):
raise ValidationError('Identities are not unique: %s' % identities)
def _ValidateWhitelabelBrandChangesOnly(json_config):
"""Verifies that whitelabel changes are contained to branding information.
Args:
json_config: JSON config dictionary
"""
whitelabels = {}
for config in json_config['chromeos']['configs']:
whitelabel_tag = config['identity'].get('whitelabel-tag', None)
if whitelabel_tag:
name = '%s - %s' % (config['name'], config['identity'].get('sku-id', 0))
config_list = whitelabels.get(name, [])
wl_minus_brand = copy.deepcopy(config)
wl_minus_brand['identity']['whitelabel-tag'] = ''
for brand_element in BRAND_ELEMENTS:
wl_minus_brand[brand_element] = ''
config_list.append(wl_minus_brand)
whitelabels[name] = config_list
# whitelabels now contains a map by device name with all whitelabel
# configs that have had their branding data stripped.
for device_name, configs in whitelabels.iteritems():
base_config = configs[0]
compare_index = 1
while compare_index < len(configs):
compare_config = configs[compare_index]
compare_index = compare_index + 1
base_str = str(base_config)
compare_str = str(compare_config)
if base_str != compare_str:
raise ValidationError(
'Whitelabel configs can only change branding attributes (%s).\n'
'However, the device %s differs by other attributes.\n'
'Example 1: %s\n'
'Example 2: %s' % (device_name,
', '.join(BRAND_ELEMENTS),
base_str,
compare_str))
def ValidateConfig(config):
"""Validates a transformed cros config for general business rules.
Performs name uniqueness checks and any other validation that can't be
easily performed using the schema.
Args:
config: Config (transformed) that will be verified.
"""
json_config = json.loads(config)
_ValidateUniqueIdentities(json_config)
_ValidateWhitelabelBrandChangesOnly(json_config)
def _FindImports(config_file, includes):
"""Recursively looks up and finds files to include for yaml.
Args:
config_file: Path to the config file for which to apply imports.
includes: List that is built up through processing the files.
"""
working_dir = os.path.dirname(config_file)
with open(config_file, 'r') as config_stream:
config_lines = config_stream.readlines()
yaml_import_lines = []
found_imports = False
# Parsing out just the imports snippet is required because the YAML
# isn't valid until the imports are eval'd
for line in config_lines:
if re.match("^imports", line):
found_imports = True
yaml_import_lines.append(line)
elif found_imports:
match = re.match(" *- (.*)", line)
if match:
yaml_import_lines.append(line)
else:
break
if yaml_import_lines:
yaml_import = yaml.load("\n".join(yaml_import_lines))
for import_file in yaml_import.get("imports", []):
full_path = os.path.join(working_dir, import_file)
_FindImports(full_path, includes)
includes.append(config_file)
def ApplyImports(config_file):
"""Parses the imports statements and applies them to a result config.
Args:
config_file: Path to the config file for which to apply imports.
Returns:
Raw config with the imports applied.
"""
import_files = []
_FindImports(config_file, import_files)
all_yaml_files = []
for import_file in import_files:
with open(import_file, 'r') as yaml_stream:
all_yaml_files.append(yaml_stream.read())
return '\n'.join(all_yaml_files)
def MergeConfigs(configs):
"""Evaluates and merges all config files into a single configuration.
Args:
configs: List of source config files that will be transformed/merged.
Returns:
Final merged JSON result.
"""
json_files = []
for yaml_file in configs:
yaml_with_imports = ApplyImports(yaml_file)
json_transformed_file = TransformConfig(yaml_with_imports)
json_files.append(json.loads(json_transformed_file))
result_json = json_files[0]
for overlay_json in json_files[1:]:
for to_merge_config in overlay_json['chromeos']['configs']:
to_merge_identity = to_merge_config.get('identity', {})
to_merge_name = to_merge_config.get('name', '')
matched = False
# Find all existing configs where there is a full/partial identity
# match or name match and merge that config into the source.
# If there are no matches, then append the config.
for source_config in result_json['chromeos']['configs']:
identity_match = False
if to_merge_identity:
source_identity = source_config['identity']
identity_match = True
for identity_key, identity_value in to_merge_identity.iteritems():
if (identity_key not in source_identity or
source_identity[identity_key] != identity_value):
identity_match = False
break
elif to_merge_name:
identity_match = to_merge_name == source_config.get('name', '')
if identity_match:
MergeDictionaries(source_config, to_merge_config)
matched = True
if not matched:
result_json['chromeos']['configs'].append(to_merge_config)
return _FormatJson(result_json)
def Main(schema,
config,
output,
filter_build_details=False,
gen_c_bindings_output=None,
configs=None):
"""Transforms and validates a cros config file for use on the system
Applies consistent transforms to covert a source YAML configuration into
a JSON file that will be used on the system by cros_config.
Verifies that the file complies with the schema verification rules and
performs additional verification checks for config consistency.
Args:
schema: Schema file used to verify the config.
config: Source config file that will be transformed/verified.
output: Output file that will be generated by the transform.
filter_build_details: Whether build only details should be filtered or not.
gen_c_bindings_output: Output file with generated c bindings.
configs: List of source config files that will be transformed/verified.
"""
if not schema:
schema = os.path.join(this_dir, 'cros_config_schema.yaml')
# TODO(shapiroc): Remove this once we no longer need backwards compatibility
# for single config parameters.
if config:
configs = [config]
full_json_transform = MergeConfigs(configs)
json_transform = full_json_transform
with open(schema, 'r') as schema_stream:
ValidateConfigSchema(schema_stream.read(), json_transform)
ValidateConfig(json_transform)
if filter_build_details:
json_transform = FilterBuildElements(json_transform)
if output:
with open(output, 'w') as output_stream:
output_stream.write(json_transform)
else:
print(json_transform)
if gen_c_bindings_output:
with open(gen_c_bindings_output, 'w') as output_stream:
output_stream.write(GenerateCBindings(full_json_transform))
def main(_argv=None):
"""Main program which parses args and runs
Args:
_argv: Intended to be the list of arguments to the program, or None to use
sys.argv (but at present this is unused)
"""
args = ParseArgs(sys.argv[1:])
Main(args.schema, args.config, args.output, args.filter,
args.generated_c_output, args.configs)
if __name__ == '__main__':
main()