blob: 7b5e3d66d580762917993f396e1db7d3ce524613 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import json
import os
class ConfigJsonIteratorError(Exception):
""""Exception for config json iterator"""
pass
class ConfigJsonIterator(object):
"""Class to consolidate multiple config json files.
This class reads and combines input JSON instances into one based on the
following rules:
1. "_deps" value in the root config file contains a list of common config
file paths. Each path represents a RELATIVE path to
the root config file.
For example (common.config is in the same directory as root.config):
root.config:
{ "a": "123",
"_deps": ["../common.config"]}
common.config:
{ "b": "xxx" }
End output:
{ "a": "123",
"b": "xxx" }
2. common config files defined in "_deps" MUST NOT contain identical keys
(otherwise an exception will be thrown), for example (invalid - common1
and common2.config are in the same directory as root.config):
root.config:
{ "a": "123",
"_deps": ["../common1.config",
"../common2.config"]}
common1.config:
{ "b": "xxx" }
common2.config:
{ "b": "yyy" }
3. values in the root config will override the ones in the common config
files. This logic applies to any dependency config file (imagine that
the common config also has "_deps"), thus is recursive.
For example (common.config is in the same directory as root.config):
root.config:
{ "a": "123",
"_deps": ["../common.config"]}
common.config:
{ "a": "456",
"b": "xxx" }
End output:
{ "a": "123",
"b": "xxx" }
"""
DEPS = '_deps'
def __init__(self, config_path=None):
"""Constructor.
@param config_path: String of root config file path.
"""
if config_path:
self.set_config_dir(config_path)
def set_config_dir(self, config_path):
"""Sets config dictionary.
@param config_path: String of config file path.
@raises ConfigJsonIteratorError if config does not exist.
"""
if not os.path.isfile(config_path):
raise ConfigJsonIteratorError('config file does not exist %s'
% config_path)
self._config_dir = os.path.abspath(os.path.dirname(config_path))
def _load_config(self, config_path):
"""Iterate the base config file.
@param config_path: String of config file path.
@return Dictionary of the config file.
@raises ConfigJsonIteratorError: if config file is not found or invalid.
"""
if not os.path.isfile(config_path):
raise ConfigJsonIteratorError('config file does not exist %s'
% config_path)
with open(config_path, 'r') as config_file:
try:
return json.load(config_file)
except ValueError:
raise ConfigJsonIteratorError(
'invalid JSON file %s' % config_file)
def aggregated_config(self, config_path):
"""Returns dictionary of aggregated config files.
The dependency list contains the RELATIVE path to the root config.
@param config_path: String of config file path.
@return Dictionary containing the aggregated config files.
@raises ConfigJsonIteratorError: if dependency config list
does not exist.
"""
ret_dict = self._load_config(config_path)
if ConfigJsonIterator.DEPS not in ret_dict:
return ret_dict
else:
deps_list = ret_dict[ConfigJsonIterator.DEPS]
if not isinstance(deps_list, list):
raise ConfigJsonIteratorError('dependency must be a list %s'
% deps_list)
del ret_dict[ConfigJsonIterator.DEPS]
common_dict = {}
for dep in deps_list:
common_config_path = os.path.join(self._config_dir, dep)
dep_dict = self.aggregated_config(common_config_path)
common_dict = self._merge_dict(common_dict, dep_dict,
allow_override=False)
return self._merge_dict(common_dict, ret_dict, allow_override=True)
def _merge_dict(self, dict_one, dict_two, allow_override=True):
"""Returns a merged dictionary.
@param dict_one: Dictionary to merge (first).
@param dict_two: Dictionary to merge (second).
@param allow_override: Boolean to allow override or not.
@return Dictionary containing merged result.
@raises ConfigJsonIteratorError: if no dictionary given.
"""
if not isinstance(dict_one, dict) or not isinstance(dict_two, dict):
raise ConfigJsonIteratorError('Input is not a dictionary')
if allow_override:
return dict(dict_one.items() + dict_two.items())
else:
merge = collections.Counter(
dict_one.keys() + dict_two.keys()).most_common()[0]
if merge[1] > 1:
raise ConfigJsonIteratorError(
'Duplicate key %s found', merge[0])
return dict_one