crostestutils: Initial helper script to update METADATA in tauto

A script which, when run locally in a repo, will make local edits
to tauto control files matching the desired parameters.

The only functionality implemented here is:
- removing given emails from 'contacts'
- modifying only specific tests

BUG=None
TEST=tests included; also used it to remove invalid emails

Change-Id: Idf88765d3dab932b8481fc24c069e17128fa4b85
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crostestutils/+/4671169
Commit-Queue: Katherine Threlkeld <kathrelkeld@chromium.org>
Tested-by: Katherine Threlkeld <kathrelkeld@chromium.org>
Reviewed-by: Jesse McGuire <jessemcguire@google.com>
Reviewed-by: Afshin Sadrieh <asadrieh@google.com>
diff --git a/metadata_modifier/tauto_modify/README b/metadata_modifier/tauto_modify/README
new file mode 100644
index 0000000..377ac16
--- /dev/null
+++ b/metadata_modifier/tauto_modify/README
@@ -0,0 +1,13 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+Running this code:
+(for now) there is no cli.  Edit tauto_modify.py to
+change what needs to be done to control files (an "action")
+and how to select which control files to act on (a "filter").
+`python tauto_modify`
+
+Modifying this code:
+`python -m unittest` to run all unit tests
diff --git a/metadata_modifier/tauto_modify/actions.py b/metadata_modifier/tauto_modify/actions.py
new file mode 100644
index 0000000..b3549c4
--- /dev/null
+++ b/metadata_modifier/tauto_modify/actions.py
@@ -0,0 +1,28 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Functions which return "actions" that modify a given ControlFile object."""
+
+# Each action should take in a single ControlFile and return a boolean value
+# of whether the control file was modified or not.
+
+def remove_contacts(emails):
+    """Return an action which removes the given list of emails from 'contacts'.
+
+    Args:
+        emails: a list of strings, e.g. ['foo@google.com']
+
+    Returns:
+        An action function that acts on a ControlFile and returns a boolean.
+    """
+    def output(cf):
+        if 'contacts' not in cf.metadata:
+            return False
+        modified = False
+        for email in emails:
+            if email in cf.metadata['contacts']:
+                cf.metadata['contacts'].remove(email)
+                modified = True
+        return modified
+    return output
diff --git a/metadata_modifier/tauto_modify/cf_parse.py b/metadata_modifier/tauto_modify/cf_parse.py
new file mode 100644
index 0000000..da0b3f3
--- /dev/null
+++ b/metadata_modifier/tauto_modify/cf_parse.py
@@ -0,0 +1,143 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Functions to parse and modify control files when updating METADATA."""
+
+import ast
+import json
+
+def format_string_value(s, indent=8, target_length=60):
+    """Format a string value for printing in a control file.
+
+    Args:
+        s: the string to be formatted
+        indent: the indent when multiple lines are needed
+        target_length: the target line length for the file
+
+    Returns:
+        A formatted string.
+    """
+    value = ' '.join(s.split()) # Remove excess spaces/newlines.
+    if len(value) < target_length:
+        return f'"{value}"'
+    printed_lines = []
+    value = value.replace('"', '\"')
+    words = value.split(' ')
+    curr = ''
+    for word in words:
+        if len(curr) + len(word) + 1 < target_length:
+            curr += word + ' '
+        else:
+            printed_lines.append(curr.strip())
+            curr = word + ' '
+    if curr != '':
+        printed_lines.append(curr.strip())
+    join_value = '"\n' + ' ' * indent + '"'
+    return f'("{join_value.join(printed_lines)}")'
+
+def format_list_value(l, indent=8):
+    """Format a list value for printing in a control file.
+
+    Args:
+        l: the list to be formatted
+        indent: the indent when multiple lines are needed
+
+    Returns:
+        A formatted string representing the list.
+    """
+    if len(l) <= 1:
+        return json.dumps(l)
+    # For lists with multiple string values, split them onto multiple lines.
+    # E.g.: [\n"foo",\n        "bar",\n    ]
+    value = json.dumps(l, indent=indent)
+    return value.replace('\n]', ',\n    ]')
+
+def format_metadata(metadata_dict):
+    """Return a formatted string representing the given metadata.
+
+    Args:
+        metadata_dict: the metadata to be formatted
+
+    Returns:
+        A formatted string of the form 'METADATA = {...}'. This output can be
+        swapped in with the previous METADATA declaraction in a file.
+    """
+    inner_values = ''
+    for key in metadata_dict:
+        value = metadata_dict[key]
+        printed_value = json.dumps(value)
+        if isinstance(value, list):
+            printed_value = format_list_value(value)
+        if isinstance(value, str):
+            printed_value = format_string_value(value, indent=len(key)+9)
+        if isinstance(value, bool):
+            printed_value = "True" if value else "False"
+        inner_values += f'    "{key}": {printed_value},\n'
+    return f'METADATA = {{\n{inner_values}}}'
+
+
+class ControlFile():
+    """Class representing a Control file to be edited (or skipped)."""
+    def __init__(self, path):
+        self.path = path
+        self.name_value = ''
+        self.contents = '' # The contents of the file
+        self.metadata_start = -1 # The index of the M in METADATA
+        self.metadata_end = -1 # The index after the closing }
+        self.isChanged = False
+
+        self.metadata = {}
+        self.is_valid = self.find_metadata_elt()
+
+    def find_metadata_elt(self):
+        """Parse the file and locate METADATA = ..., if present.
+
+        Returns:
+            True if the metadata declaration was found, else False.
+        """
+        with open(self.path, encoding='utf-8') as f:
+            self.contents = f.read()
+        if not self.contents:
+            return False
+
+        parsed_file = ast.parse(self.contents)
+        metadata_elt = None
+        for elt in parsed_file.body:
+            if (isinstance(elt, ast.Assign) and
+                len(elt.targets) > 0 and
+                isinstance(elt.targets[0], ast.Name)):
+                first_target = ast.Name(elt.targets[0].id)
+                if (first_target.id == 'METADATA' and
+                    isinstance(elt.value, ast.Dict)):
+                    metadata_elt = elt
+                if (first_target.id == 'NAME' and
+                    isinstance(elt.value, ast.Constant) and
+                    isinstance(elt.value.value, str)):
+                    self.name_value = elt.value.value
+
+        if not metadata_elt:
+            return False
+
+        # Caclulate file offsets for the METADATA declaration.
+        # Note that ast only reports offsets into a specific line, while
+        # we need the offset into the entire file.
+        lines = self.contents.split("\n")
+        file_offset = 0
+        for i, _ in enumerate(lines):
+            if i == metadata_elt.lineno - 1:
+                self.metadata_start = file_offset + metadata_elt.col_offset
+            if i == metadata_elt.end_lineno - 1:
+                self.metadata_end = file_offset + metadata_elt.end_col_offset
+                break
+            file_offset += len(lines[i])+1
+
+        self.metadata = ast.literal_eval(metadata_elt.value)
+        return True
+
+    def update_contents(self):
+        """Modify self.contents using the modified values in self.metadata."""
+        new_metadata = format_metadata(self.metadata)
+        self.contents = (self.contents[:self.metadata_start] +
+                         new_metadata +
+                         self.contents[self.metadata_end:])
diff --git a/metadata_modifier/tauto_modify/filters.py b/metadata_modifier/tauto_modify/filters.py
new file mode 100644
index 0000000..fd2dcb4
--- /dev/null
+++ b/metadata_modifier/tauto_modify/filters.py
@@ -0,0 +1,33 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Functions which return "filters" for control files."""
+
+# Each filter should take in a single ControlFile and return a boolean value
+# of whether the control file matches the desired filter.
+
+def all_tests():
+    """Creates a filter which acts on all tests.
+
+    Returns:
+        A filter function which acts on a ControlFile and returns a boolean.
+    """
+    def output(cf):
+        return cf.is_valid
+    return output
+
+def test_list(tests):
+    """Creates a filter which acts on the given list of test ids.
+
+    Args:
+        tests: A list of test id prefixed with "tauto.", e.g.
+               ["tauto.test1", "tauto.test2"].
+
+    Returns:
+        A filter function which acts on a ControlFile and returns a boolean.
+    """
+    def output(cf):
+        prefixed_name = 'tauto.' + cf.name_value
+        return (cf.name_value != '') and (prefixed_name in tests)
+    return output
diff --git a/metadata_modifier/tauto_modify/tauto_modify.py b/metadata_modifier/tauto_modify/tauto_modify.py
new file mode 100644
index 0000000..d596068
--- /dev/null
+++ b/metadata_modifier/tauto_modify/tauto_modify.py
@@ -0,0 +1,58 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Helper script to parse autotest control files for data gathering."""
+
+import os
+import pathlib
+
+import actions
+import cf_parse
+import filters
+
+
+# ChromeOS src/ dir relative to this file.
+SRC_DIR = pathlib.Path(__file__).joinpath('../../../../..').resolve()
+
+def modify_control_files(action_func_list, filter_func_list, dry_run):
+    # Places to look for control files, relative to SRC_DIR.
+    AUTOTEST_DIRS = [
+            'third_party/autotest/files/client/site_tests/',
+            'third_party/autotest/files/server/site_tests/',
+            'third_party/autotest-private/client/site_tests/',
+    ]
+
+    for tests_dir in [pathlib.Path(SRC_DIR, d) for d in AUTOTEST_DIRS]:
+        for cf_path in tests_dir.glob('*/control*'):
+            cf = cf_parse.ControlFile(cf_path)
+            if not cf.is_valid:
+                continue
+
+            # Skip this control file if it doesn't match all the given filters.
+            if not all(filter_func(cf) for filter_func in filter_func_list):
+                continue
+
+            # Apply the given actions to this control file.
+            if not any(action_func(cf) for action_func in action_func_list):
+                continue
+            cf.update_contents()
+
+            if dry_run:
+                print(f'Will modify {cf_path}:')
+                print(cf.contents)
+            else:
+                print(f'Editing {cf_path}')
+                with open(cf.path, 'w', encoding='utf-8') as f:
+                    f.write(cf.contents)
+
+
+def main():
+    os.chdir(SRC_DIR)
+    action_func = actions.remove_contacts(['notarealemail@google.com'])
+    filter_func = filters.all_tests()
+    dry_run = True
+    modify_control_files([action_func], [filter_func], dry_run)
+
+if __name__ == '__main__':
+    main()
diff --git a/metadata_modifier/tauto_modify/test_actions.py b/metadata_modifier/tauto_modify/test_actions.py
new file mode 100644
index 0000000..4afc08f
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_actions.py
@@ -0,0 +1,34 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for actions."""
+
+import os
+import unittest
+
+import actions
+import cf_parse
+
+TEST_DATA_DIR = 'test_data/'
+
+class TestActions(unittest.TestCase):
+    """Tests for actions."""
+    def test_remove_contacts(self):
+        delete_me = 'removable@google.com'
+        action = actions.remove_contacts([delete_me])
+        test_file = os.path.join(TEST_DATA_DIR, 'control.actions')
+        cf = cf_parse.ControlFile(test_file)
+
+        self.assertTrue('contacts' in cf.metadata)
+        self.assertTrue(delete_me in cf.metadata['contacts'])
+        modified = action(cf)
+        self.assertTrue(modified)
+        self.assertTrue('contacts' in cf.metadata)
+        self.assertFalse(delete_me in cf.metadata['contacts'])
+
+        modified = action(cf)
+        self.assertFalse(modified)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/metadata_modifier/tauto_modify/test_cf_parse.py b/metadata_modifier/tauto_modify/test_cf_parse.py
new file mode 100644
index 0000000..5c42a87
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_cf_parse.py
@@ -0,0 +1,75 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for cf_parse."""
+
+import os
+import unittest
+
+import cf_parse
+
+TEST_DATA_DIR = 'test_data/'
+
+class TestControlFileParse(unittest.TestCase):
+    """Tests for cf_parse."""
+    def test_parse(self):
+        test_file = os.path.join(TEST_DATA_DIR, 'control.cf_parse')
+        cf = cf_parse.ControlFile(test_file)
+        self.assertEqual(cf.name_value, 'fake_test')
+        self.assertEqual(cf.metadata_start, 19)
+        self.assertEqual(cf.contents[cf.metadata_start], 'M')
+        self.assertEqual(cf.metadata_end, 93)
+        self.assertEqual(cf.contents[cf.metadata_end], '\n')
+
+    def test_format_string(self):
+        s_short = str(cf_parse.format_string_value('foo'))
+        self.assertEqual(s_short, '"foo"')
+        s_long = cf_parse.format_string_value(
+                '123456789 123456 890123456 8901234 67890'
+                '1234 6789012\n 4567890 234567890 234567890'
+                '1234 6789012345678901234567\n 901234567890')
+        expected_long = (
+                '("123456789 123456 890123456 8901234 678901234 6789012"\n'
+                '        "4567890 234567890 2345678901234 '
+                '6789012345678901234567"\n'
+                '        "901234567890")')
+        self.assertEqual(s_long, expected_long)
+
+    def test_format_list(self):
+        l_short = cf_parse.format_list_value(['foo'])
+        self.assertEqual(l_short, '["foo"]')
+
+        l_long = cf_parse.format_list_value(['foo', 'bar'])
+        expected_long = '[\n        "foo",\n        "bar",\n    ]'
+        self.assertEqual(l_long, expected_long)
+
+    def test_format_metadata(self):
+        metadata_dict = {
+                'foo': 'bar',
+                'num': 4,
+                'bool': True,
+                'lnum': [1]
+        }
+        formatted = cf_parse.format_metadata(metadata_dict)
+        expected = (
+                'METADATA = {\n'
+                '    "foo": "bar",\n'
+                '    "num": 4,\n'
+                '    "bool": True,\n'
+                '    "lnum": [1],\n}'
+        )
+        self.assertEqual(formatted, expected)
+
+    def test_update_contents(self):
+        test_file = os.path.join(TEST_DATA_DIR, 'control.cf_parse')
+        cf = cf_parse.ControlFile(test_file)
+        cf.metadata = {}
+        cf.update_contents()
+        expected = (
+                'NAME = "fake_test"\n'
+                'METADATA = {\n}\n')
+        self.assertEqual(cf.contents, expected)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/metadata_modifier/tauto_modify/test_data/control.actions b/metadata_modifier/tauto_modify/test_data/control.actions
new file mode 100644
index 0000000..a6b51b3
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_data/control.actions
@@ -0,0 +1,4 @@
+NAME = "actions_test"
+METADATA = {
+    'contacts': ['replaceable@google.com', 'removable@google.com']
+}
diff --git a/metadata_modifier/tauto_modify/test_data/control.cf_parse b/metadata_modifier/tauto_modify/test_data/control.cf_parse
new file mode 100644
index 0000000..1a86166
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_data/control.cf_parse
@@ -0,0 +1,4 @@
+NAME = "fake_test"
+METADATA = {
+    'contacts': ['name@google.com', 'removable@google.com']
+}
diff --git a/metadata_modifier/tauto_modify/test_data/control.filter_name b/metadata_modifier/tauto_modify/test_data/control.filter_name
new file mode 100644
index 0000000..56294bd
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_data/control.filter_name
@@ -0,0 +1,4 @@
+NAME = "filterable_name"
+METADATA = {
+    'contacts': ['replaceable@google.com', 'removable@google.com']
+}
diff --git a/metadata_modifier/tauto_modify/test_filters.py b/metadata_modifier/tauto_modify/test_filters.py
new file mode 100644
index 0000000..816ce31
--- /dev/null
+++ b/metadata_modifier/tauto_modify/test_filters.py
@@ -0,0 +1,28 @@
+# Copyright 2023 The ChromiumOS Authors
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for filters."""
+
+import os
+import unittest
+
+import filters
+import cf_parse
+
+TEST_DATA_DIR = 'test_data/'
+
+class TestFilters(unittest.TestCase):
+    """Tests for filters."""
+    def test_list_of_tests(self):
+        filter_func = filters.test_list(['tauto.filterable_name'])
+        test_file_match = os.path.join(TEST_DATA_DIR, 'control.filter_name')
+        cf_match = cf_parse.ControlFile(test_file_match)
+        test_file_no_match = os.path.join(TEST_DATA_DIR, 'control.actions')
+        cf_no_match = cf_parse.ControlFile(test_file_no_match)
+
+        self.assertTrue(filter_func(cf_match))
+        self.assertFalse(filter_func(cf_no_match))
+
+if __name__ == '__main__':
+    unittest.main()