autotest: Add module for loading dynamic configs.
BUG=b:182083461
TEST=unittest
Change-Id: Ifa04321693eb58f9da05c7fae898a9d50aa6039d
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/3099252
Auto-Submit: Yury Ershov <ershov@google.com>
Reviewed-by: Yury Ershov <ershov@google.com>
Reviewed-by: Alex Bergman <abergman@google.com>
Tested-by: Yury Ershov <ershov@google.com>
Tested-by: Alex Bergman <abergman@google.com>
Commit-Queue: Yury Ershov <ershov@google.com>
diff --git a/client/common_lib/config_vars.py b/client/common_lib/config_vars.py
new file mode 100644
index 0000000..2fcc5c3
--- /dev/null
+++ b/client/common_lib/config_vars.py
@@ -0,0 +1,221 @@
+# Lint as: python2, python3
+# Copyright (c) 2021 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""
+Functions to load config variables from JSON with transformation.
+
+* The config is a key-value dictionary.
+* If the value is a list, then the list constitutes a list of conditions
+ to check.
+* A condition is a key-value dictionary where the key is an external variable
+ name and the value is a case-insensitive regexp to match. If multiple
+ variables used, they all must match for the condition to succeed.
+* A special key "value" is the value to assign to the variable.
+* The first matching condition wins.
+* Condition with zero external vars always succeeds - it should be the last in
+ the list as the last resort case.
+* If no condition matches, it's an error.
+* The value, in turn, can be a nested list of conditions.
+
+Example:
+ Python source:
+ config = TransformJsonFile(
+ "config.json",
+ extvars={
+ "board": "board1",
+ "model": "model1",
+ })
+ # config -> {
+ # "cuj_username": "user",
+ # "private_key": "SECRET",
+ # "some_var": "val for board1",
+ # "some_var2": "val2",
+ # }
+
+ config = TransformJsonFile(
+ "config.json",
+ extvars={
+ "board": "board2",
+ "model": "model2",
+ })
+ # config -> {
+ # "cuj_username": "user",
+ # "private_key": "SECRET",
+ # "some_var": "val for board2",
+ # "some_var2": "val2 for board2 model2",
+ # }
+
+ config.json:
+ {
+ "cuj_username": "user",
+ "private_key": "SECRET",
+ "some_var": [
+ {
+ "board": "board1.*",
+ "value": "val for board1",
+ },
+ {
+ "board": "board2.*",
+ "value": "val for board2",
+ },
+ {
+ "value": "val for board2",
+ }
+ ],
+ "some_var2": [
+ {
+ "board": "board2.*",
+ "model": "model2.*",
+ "value": "val2 for board2 model2",
+ },
+ {
+ "value": "val2",
+ }
+ ],
+ }
+
+"""
+
+# Lint as: python2, python3
+# pylint: disable=missing-docstring,bad-indentation
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import json
+import logging
+import re
+
+try:
+ unicode
+except NameError:
+ unicode = str
+
+VERBOSE = False
+
+
+class ConfigTransformError(ValueError):
+ pass
+
+
+def TransformConfig(data, extvars):
+ """Transforms data loaded from JSON to config variables.
+
+ Args:
+ data (dict): input data dictionary from JSON parser
+ extvars (dict): external variables dictionary
+
+ Returns:
+ dict: config variables
+
+ Raises:
+ ConfigTransformError: transformation error
+ 're' errors
+ """
+ if not isinstance(data, dict):
+ _Error('Top level configuration object must be a dictionary but got ' +
+ data.__class__.__name__)
+
+ return {key: _GetVal(val, extvars) for key, val in data.items()}
+
+
+def TransformJsonText(text, extvars):
+ """Transforms JSON text to config variables.
+
+ Args:
+ text (str): JSON input
+ extvars (dict): external variables dictionary
+
+ Returns:
+ dict: config variables
+
+ Raises:
+ ConfigTransformError: transformation error
+ 're' errors
+ 'json' errors
+ """
+ data = json.loads(text)
+ return TransformConfig(data, extvars)
+
+
+def TransformJsonFile(file_name, extvars):
+ """Transforms JSON file to config variables.
+
+ Args:
+ file_name (str): JSON file name
+ extvars (dict): external variables dictionary
+
+ Returns:
+ dict: config variables
+
+ Raises:
+ ConfigTransformError: transformation error
+ 're' errors
+ 'json' errors
+ IO errors
+ """
+ with open(file_name, 'r') as f:
+ data = json.load(f)
+ return TransformConfig(data, extvars)
+
+
+def _GetVal(val, extvars):
+ """Calculates and returns the config variable value.
+
+ Args:
+ val (str | list): variable value or conditions list
+ extvars (dict): external variables dictionary
+
+ Returns:
+ str: resolved variable value
+
+ Raises:
+ ConfigTransformError: transformation error
+ """
+ if (isinstance(val, str) or isinstance(val, unicode)
+ or isinstance(val, int) or isinstance(val, float)):
+ return val
+
+ if not isinstance(val, list):
+ _Error('Conditions must be an array but got ' + val.__class__.__name__,
+ json.dumps(val))
+
+ for cond in val:
+ if not isinstance(cond, dict):
+ _Error(
+ 'Condition must be a dictionary but got ' +
+ cond.__class__.__name__, json.dumps(cond))
+ if 'value' not in cond:
+ _Error('Missing mandatory "value" key from condition',
+ json.dumps(cond))
+
+ for cond_key, cond_val in cond.items():
+ if cond_key == 'value':
+ continue
+ if cond_key not in extvars:
+ logging.warning('Ignored unknown external var: %s', cond_key)
+ break
+ if re.search(cond_val, extvars[cond_key], re.I) is None:
+ break
+ else:
+ return _GetVal(cond['value'], extvars)
+
+ _Error('Condition did not match any external vars',
+ json.dumps(val, indent=4) + '\nvars: ' + extvars.__str__())
+
+
+def _Error(text, extra=''):
+ """Reports and raise an error.
+
+ Args:
+ text (str): Error text
+ extra (str, optional): potentially sensitive error text for verbose output
+
+ Raises:
+ ConfigTransformError: error
+ """
+ if VERBOSE:
+ text += ':\n' + extra
+ logging.error('%s', text)
+ raise ConfigTransformError(text)
diff --git a/client/common_lib/config_vars_unittest.py b/client/common_lib/config_vars_unittest.py
new file mode 100755
index 0000000..4e49c15
--- /dev/null
+++ b/client/common_lib/config_vars_unittest.py
@@ -0,0 +1,278 @@
+#!/usr/bin/python2
+# Lint as: python2, python3
+# pylint: disable=missing-docstring,bad-indentation
+
+import common
+import unittest
+import logging
+
+from autotest_lib.client.common_lib.config_vars import TransformJsonText, ConfigTransformError
+
+
+class ConfigVarsTransformTestCase(unittest.TestCase):
+ def testSimple(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz"
+ }""", {"qwe": "asd"}), {'a': 'zzz'})
+
+ def testSimpleCond(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "asd",
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv'
+ })
+
+ def testSimpleCond2(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv'
+ })
+
+ def testSimpleCondFallback(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "xxx",
+ "value": "vvvvv1"
+ },
+ {
+ "AAA": "yyy",
+ "value": "vvvvv2"
+ },
+ {
+ "value": "vvvvv3"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv3'
+ })
+
+ def testNoMatch(self):
+ logging.disable(logging.CRITICAL)
+ self.assertRaises(
+ ConfigTransformError, TransformJsonText, """{
+ "a": "zzz",
+ "b": [
+ {
+ "XXX": "asd",
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"})
+ logging.disable(logging.NOTSET)
+
+ def testUnmatch(self):
+ logging.disable(logging.CRITICAL)
+ self.assertRaises(
+ ConfigTransformError, TransformJsonText, """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "zzz",
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"})
+ logging.disable(logging.NOTSET)
+
+ def testMatchFirst(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "asd",
+ "value": "vvvvv1"
+ },
+ {
+ "AAA": "asd",
+ "value": "vvvvv2"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv1'
+ })
+
+ def testMatchMid(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "zzz",
+ "value": "vvvvv1"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "jjj",
+ "value": "vvvvv2"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "value": "vvvvv3"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "qwe",
+ "value": "vvvvv4"
+ }
+ ]
+ }""", {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "qwe"
+ }), {
+ 'a': 'zzz',
+ 'b': 'vvvvv3'
+ })
+
+ def testMatchLast(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "zzz",
+ "value": "vvvvv1"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "jjj",
+ "value": "vvvvv2"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "jjj",
+ "value": "vvvvv3"
+ },
+ {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "qwe",
+ "value": "vvvvv4"
+ }
+ ]
+ }""", {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "qwe"
+ }), {
+ 'a': 'zzz',
+ 'b': 'vvvvv4'
+ })
+
+ def testNested(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "asd",
+ "value": [
+ {
+ "BBB": "zxc",
+ "value": [
+ {
+ "CCC": "qwe",
+ "value": "vvvvv4"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }""", {
+ "AAA": "asd",
+ "BBB": "zxc",
+ "CCC": "qwe"
+ }), {
+ 'a': 'zzz',
+ 'b': 'vvvvv4'
+ })
+
+ def testRegex(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "^a.*",
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv'
+ })
+
+ def testRegexCase(self):
+ self.assertDictEqual(
+ TransformJsonText(
+ """{
+ "a": "zzz",
+ "b": [
+ {
+ "AAA": "^A.*D$",
+ "value": "vvvvv"
+ }
+ ]
+ }""", {"AAA": "asd"}), {
+ 'a': 'zzz',
+ 'b': 'vvvvv'
+ })
+
+ def testEmptyInput(self):
+ self.assertRaises(ValueError, TransformJsonText, '', {"qwe": "asd"})
+
+ def testMalformedJson(self):
+ self.assertRaises(ValueError, TransformJsonText, '{qwe',
+ {"qwe": "asd"})
+
+ def testNonObjectTopLevelJson(self):
+ logging.disable(logging.CRITICAL)
+ self.assertRaises(ConfigTransformError, TransformJsonText, '[1, 2, 3]',
+ {"qwe": "asd"})
+ logging.disable(logging.NOTSET)
+
+ def testNonObjectTopLevelJson2(self):
+ logging.disable(logging.CRITICAL)
+ self.assertRaises(ConfigTransformError, TransformJsonText, '"wwwww"',
+ {"qwe": "asd"})
+ logging.disable(logging.NOTSET)
+
+
+if __name__ == '__main__':
+ unittest.main()