autotest: Add structure to support servo-topology.

BUG=b:166680257
TEST=run local

Change-Id: Ibfd1d5e5e6634b5e815555518e912961413f644a
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2472594
Tested-by: Otabek Kasimov <otabek@google.com>
Reviewed-by: Ruben Rodriguez Buchillon <coconutruben@chromium.org>
Reviewed-by: Garry Wang <xianuowang@chromium.org>
Commit-Queue: Otabek Kasimov <otabek@google.com>
diff --git a/server/cros/servo/topology/__init__.py b/server/cros/servo/topology/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/server/cros/servo/topology/__init__.py
diff --git a/server/cros/servo/topology/common.py b/server/cros/servo/topology/common.py
new file mode 100644
index 0000000..6eebf8b
--- /dev/null
+++ b/server/cros/servo/topology/common.py
@@ -0,0 +1,15 @@
+#!/usr/bin/python2
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Build relative paths for files with root of autotest_lib.
+
+import os, sys
+dirname = os.path.dirname(sys.modules[__name__].__file__)
+autotest_dir = os.path.abspath(os.path.join(dirname, '../../../..'))
+client_dir = os.path.join(autotest_dir, 'client')
+sys.path.insert(0, client_dir)
+import setup_modules
+sys.path.pop(0)
+setup_modules.setup(base_path=autotest_dir, root_module_name='autotest_lib')
diff --git a/server/cros/servo/topology/servo_topology.py b/server/cros/servo/topology/servo_topology.py
new file mode 100644
index 0000000..56d66fc
--- /dev/null
+++ b/server/cros/servo/topology/servo_topology.py
@@ -0,0 +1,358 @@
+#!/usr/bin/env python2
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from __future__ import print_function
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from __future__ import division
+
+import os
+import copy
+import json
+import base64
+import logging
+
+import common
+from autotest_lib.client.common_lib import hosts
+from autotest_lib.server.cros.servo.topology import topology_constants as stc
+
+
+class ServoTopologyError(Exception):
+    """
+    Generic Exception for failures from ServoTopology object.
+    """
+    pass
+
+
+class MissingServoError(ServoTopologyError):
+    """
+    Exception to throw when child servo type is missing.
+    """
+
+    def __init__(self, message, servo_type):
+        self._servo_type = servo_type
+        self.message = message
+
+    def __str__(self):
+        return repr(self.message)
+
+
+class ServoTopology(object):
+    """Class to read, generate and validate servo topology in the lab.
+
+    The class support detection of servo listed in ST_PRODUCT_TYPES.
+    To save servo topology to host-info date passed two steps:
+       - convert to the json
+       - encode to base64
+    """
+
+    def __init__(self, servo_host):
+        self._host = servo_host
+        self._topology = None
+
+    def read(self, host_info):
+        """Reading servo-topology info."""
+        logging.info('Reading servo topology info...')
+        self._topology = None
+        if not host_info:
+            logging.info('The host_info not provided. Skip reading.')
+            return
+        b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
+        self._topology = _parse_string_as_topology(b64_val)
+        logging.debug('Loaded servo topology: %s', self._topology)
+        if self._topology:
+            logging.info('Servo topology loaded successfully.')
+
+    def save(self, host_info_store):
+        """Saving servo-topology info."""
+        if self.is_empty():
+            logging.info('Topology is empty. Skip saving.')
+            return
+        if not host_info_store:
+            logging.info('The host_info_store not provided. Skip saving.')
+            return
+        logging.info('Saving servo topology info...')
+        data = _convert_topology_to_string(self._topology)
+        if not data:
+            logging.info('Servo topology fail to save data.'
+                         ' Please file a bug.')
+            return
+        host_info = host_info_store.get()
+        prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
+        if prev_value and prev_value == data:
+            logging.info('Servo topology was not changed. Skip saving.')
+            return
+        logging.debug('Previous saved topology: %s', prev_value)
+        host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data)
+        host_info_store.commit(host_info)
+        logging.info('Servo topology saved successfully.')
+
+    def generate(self):
+        """Read servo data and create topology."""
+        try:
+            self._topology = self._generate()
+        except Exception as e:
+            self._topology = None
+            logging.debug('(Not critical) %s', e)
+            logging.info('Fail to generate servo-topology')
+        if not self.is_empty():
+            logging.info('Servo topology successfully generated.')
+
+    def is_empty(self):
+        """If topology data was initialized."""
+        return not bool(self._topology)
+
+    def validate(self, raise_error=False, dual_set=False, compare=False):
+        """Validate topology against expected topology.
+
+        Validation against:
+        - set-up expectation: min one child or 2 for DUAL_V4
+        - last saved topology: check if any device missed
+
+        @params raise_error: raise error if validate did not pass otherwise
+                             return False.
+        @params dual_set:    Check if servo expect DUAL_V4 setup.
+        @params compare:     Validate against saved topology.
+        """
+        new_st = self._generate()
+        if not new_st or not new_st.get(stc.ST_DEVICE_MAIN):
+            message = 'Main device is not detected'
+            return self._process_error(message, raise_error)
+        children = new_st.get(stc.ST_DEVICE_CHILDREN)
+        # basic setup has to have minimum one child.
+        if not children or len(children) < 1:
+            message = 'Each setup has at least one child'
+            return self._process_error(message, raise_error)
+        children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children]
+        # DUAL_V4 setup has to have cr50 and one more child.
+        if dual_set:
+            if stc.ST_CR50_TYPE not in children_types:
+                return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error)
+            if len(children) < 2:
+                message = 'Expected two children but have only one'
+                return self._process_error(message, raise_error)
+        if compare and not self.is_empty():
+            main_device = new_st.get(stc.ST_DEVICE_MAIN)
+            t = self._topology
+            old_main = t.get(stc.ST_DEVICE_MAIN)
+            old_children = t.get(stc.ST_DEVICE_CHILDREN)
+            if not old_main or not old_main.get(stc.ST_DEVICE_HUB_PORT):
+                # Old data is invalid for comparasing
+                return True
+            if not self._equal_item(old_main, main_device):
+                message = 'Main servo was changed'
+                return self._process_error(message, raise_error)
+            for child in old_children:
+                old_type = child.get(stc.ST_DEVICE_TYPE)
+                if old_type not in children_types:
+                    return self._missing_servo_error(old_type, raise_error)
+            if len(children) < len(old_children):
+                message = 'Some child is missed'
+                return self._process_error(message, raise_error)
+        logging.info('Servo topology successfully verified.')
+        return True
+
+    def _process_error(self, message, raise_error):
+        if not raise_error:
+            logging.info('Validate servo topology failed with: %s', message)
+            return False
+        raise ServoTopologyError(message)
+
+    def _missing_servo_error(self, servo_type, raise_error):
+        message = 'Missed servo: %s!' % servo_type
+        if not raise_error:
+            logging.info('Validate servo topology failed with: %s', message)
+            return False
+        raise MissingServoError(message, servo_type)
+
+    def _equal_item(self, old, new):
+        """Servo was replugged to another port"""
+        for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS:
+            if old.get(field) != new.get(field):
+                return False
+        return True
+
+    def _generate(self):
+        """Generate and return topology structure.
+
+        Read and generate topology structure with out update the state.
+        """
+        logging.debug('Trying generate a servo-topology')
+        core_servo_serial = self._host.servo_serial
+        if not core_servo_serial:
+            logging.info('Servo serial is not provided.')
+            return None
+        logging.debug('Getting topology for core servo: %s', core_servo_serial)
+        # collect main device info
+        cmd_hub = 'servodtool device -s %s usb-path' % core_servo_serial
+        servo_path = self._read_line(cmd_hub)
+        logging.debug('Device -%s path: %s', core_servo_serial, servo_path)
+        if not servo_path:
+            logging.info('Core servo not detected.')
+            return None
+        if not self._is_expected_type(servo_path):
+            return None
+        main_device = self._read_device_info(servo_path)
+        if not main_device:
+            logging.debug('Core device missed some data')
+            return None
+        # collect child device info
+        children = []
+        hub_path = servo_path[0:-2]
+        logging.debug('Core hub path: %s', hub_path)
+        devices_cmd = 'find %s/* -name serial' % hub_path
+        devices = self._read_multilines(devices_cmd)
+        core_device_port = main_device.get(stc.ST_DEVICE_HUB_PORT)
+        for device in devices:
+            logging.debug('Child device %s', device)
+            device_dir = os.path.dirname(device)
+            if not self._is_expected_type(device_dir):
+                # skip not expected device type like USB or hubs
+                continue
+            child = self._read_device_info(device_dir)
+            if not child:
+                logging.debug('Child missed some data.')
+                continue
+            if core_device_port == child.get(stc.ST_DEVICE_HUB_PORT):
+                logging.debug('Skip device if match with core device')
+                continue
+            children.append(child)
+        topology = {
+                stc.ST_DEVICE_MAIN: main_device,
+                stc.ST_DEVICE_CHILDREN: children
+        }
+        logging.debug('Servo topology: %s', topology)
+        return topology
+
+    def _is_expected_type(self, path):
+        """Check if device type is known servo type.
+
+        Please update ST_PRODUCT_TYPES to extend more servo types.
+        """
+        product = self._read_file(path, 'product')
+        if bool(stc.ST_PRODUCT_TYPES.get(product)):
+            return True
+        logging.info('Unknown product: %s', product)
+        return False
+
+    def _read_device_info(self, path):
+        """Read device details for topology.
+
+        @params path: Absolute path to the device in FS.
+        """
+        serial = self._read_file(path, 'serial')
+        product = self._read_file(path, 'product')
+        hub_path = self._read_file(path, 'devpath')
+        stype = stc.ST_PRODUCT_TYPES.get(product)
+        return self._create_item(serial, stype, product, hub_path)
+
+    def _create_item(self, servo_serial, servo_type, product, hub_path):
+        """Create topology item.
+
+        Return created item only if all details provided.
+
+        @params servo_serial:   Serial number of device.
+        @params servo_type:     Product type code of the device.
+        @params product:        Product name of the device.
+        @params hub_path:       Device enumerated folder name. Show the
+                                chain of used ports to connect the device.
+        """
+        item = {
+                stc.ST_DEVICE_SERIAL: servo_serial,
+                stc.ST_DEVICE_TYPE: servo_type,
+                stc.ST_DEVICE_PRODUCT: product,
+                stc.ST_DEVICE_HUB_PORT: hub_path
+        }
+        if not (servo_serial and servo_type and product and hub_path):
+            logging.debug('Some data missing: %s', item)
+            return None
+        return item
+
+    def _read_file(self, path, file_name):
+        """Read context of the file and return result as one line.
+
+        If execution finished with error result will be empty string.
+
+        @params path:       Path to the folder where file located.
+        @params file_name:  The file name to read.
+        """
+        if not path or not file_name:
+            return ''
+        f = os.path.join(path, file_name)
+        return self._read_line('cat %s' % f)
+
+    def _read_line(self, command):
+        """Execute terminal command and return result as one line.
+
+        If execution finished with error result will be empty string.
+
+        @params command:    String to execute.
+        """
+        r = self._host.run(command, ignore_status=True, timeout=30)
+        if r.exit_status == 0:
+            return r.stdout.strip()
+        return ''
+
+    def _read_multilines(self, command):
+        """Execute terminal command and return result as multi-line.
+
+        If execution finished with error result will be an empty array.
+
+        @params command:    String to execute.
+        """
+        r = self._host.run(command, ignore_status=True, timeout=30)
+        if r.exit_status == 0:
+            return r.stdout.splitlines()
+        return []
+
+
+def _convert_topology_to_string(topology):
+    """Convert topology to the string respresentation.
+
+    Convert topology to json and encode by Base64 for host-info file.
+
+    @params topology: Servo topology data
+    @returns: topology representation in Base64 string
+    """
+    if not topology:
+        return ''
+    try:
+        # generate json similar to golang to avoid extra updates
+        json_string = json.dumps(topology, separators=(',', ':'))
+        logging.debug('Servo topology (json): %s', json_string)
+    except Exception as e:
+        logging.debug('(Not critical) %s', e)
+        logging.info('Failed to convert topology to json')
+        return ''
+    try:
+        # recommended to convert to the bytes for python 3
+        b64_string = base64.b64encode(json_string.encode("utf-8"))
+        logging.debug('Servo topology (b64): %s', b64_string)
+        return b64_string
+    except Exception as e:
+        logging.debug('(Not critical) %s', e)
+        logging.info('Failed to convert topology to base64')
+    return ''
+
+
+def _parse_string_as_topology(src):
+    """Parse and load servo topology from string.
+
+    Decode Base64 and load as json of servo-topology data.
+
+    @params src: topology representation in Base64 string
+    @returns: servo topology data
+    """
+    if not src:
+        logging.debug('Servo topology data not present in host-info.')
+        return None
+    try:
+        json_string = base64.b64decode(src)
+        logging.debug('Servo topology (json) from host-info: %s', json_string)
+        return json.loads(json_string)
+    except Exception as e:
+        logging.debug('(Not critical) %s', e)
+        logging.info('Fail to read servo-topology from host-info.')
+    return None
diff --git a/server/cros/servo/topology/servo_topology_unittest.py b/server/cros/servo/topology/servo_topology_unittest.py
new file mode 100644
index 0000000..890d2f8
--- /dev/null
+++ b/server/cros/servo/topology/servo_topology_unittest.py
@@ -0,0 +1,123 @@
+#!/usr/bin/python2
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import mock
+import unittest
+
+import common
+from autotest_lib.server.cros.servo.topology import topology_constants as stc
+from autotest_lib.server.cros.servo.topology import servo_topology
+
+# pylint: disable=missing-docstring
+
+
+class ConvertDataToStringTests(unittest.TestCase):
+    def test_expect_empty_when_data_is_none(self):
+        data = servo_topology._convert_topology_to_string(None)
+        self.assertEqual(data, '')
+
+    def test_expect_empty_when_data_is_empty(self):
+        data = servo_topology._convert_topology_to_string({})
+        self.assertEqual(data, '')
+
+    def test_expect_string_when_data_is_provided(self):
+        expected_string = (
+                'eyJtYWluIjp7InVzYl9odWJfcG9ydCI6IjYuNC4xIiwic2VyaWFsIjoiQzE'
+                '5MDMxNDU1OTEhQCMkJV4qKClcIic6OycnXFw/L2BcIiwiLCJ0eXBlIjoic2'
+                'Vydm9fdjQiLCJzeXNmc19wcm9kdWN0IjoiU2Vydm8gVjQifSwiY2hpbGRyZ'
+                'W4iOlt7InVzYl9odWJfcG9ydCI6IjYuNC4yIiwic2VyaWFsIjoiMDY4MUQw'
+                'M0EtOTJEQ0NENjQiLCJ0eXBlIjoiY2NkX2NyNTAiLCJzeXNmc19wcm9kdWN'
+                '0IjoiQ3I1MCJ9XX0=')
+        data = {
+                "main": {
+                        "usb_hub_port": "6.4.1",
+                        "serial": "C1903145591!@#$%^*()\"':;''\\?/`\",",
+                        "type": "servo_v4",
+                        "sysfs_product": "Servo V4"
+                },
+                "children": [{
+                        "usb_hub_port": "6.4.2",
+                        "serial": "0681D03A-92DCCD64",
+                        "type": "ccd_cr50",
+                        "sysfs_product": "Cr50"
+                }]
+        }
+
+        result = servo_topology._convert_topology_to_string(data)
+        self.assertEqual(result, expected_string)
+
+    def test_expect_string_when_data_is_provided2(self):
+        expected_string = (
+                'eyJtYWluIjp7InVzYl9odWJfcG9ydCI6IiIsInNlcmlhbCI6IiIsInR5cGU'
+                'iOiIiLCJzeXNmc19wcm9kdWN0IjoiIn0sImNoaWxkcmVuIjpbXX0=')
+        data = {
+                "main": {
+                        "usb_hub_port": "",
+                        "serial": "",
+                        "type": "",
+                        "sysfs_product": ""
+                },
+                "children": []
+        }
+
+        result = servo_topology._convert_topology_to_string(data)
+        self.assertEqual(result, expected_string)
+
+
+class ParseStringToDataTests(unittest.TestCase):
+    def test_expect_none_when_string_empty(self):
+        data = servo_topology._parse_string_as_topology("")
+        self.assertEqual(data, None)
+
+    def test_expect_empty_data_when_string_empty_instance(self):
+        data = servo_topology._parse_string_as_topology('{}')
+        self.assertEqual(data, None)
+
+    def test_expect_data_when_string_is_provided(self):
+        b64_data = (
+                'eyJtYWluIjp7InVzYl9odWJfcG9ydCI6IjYuNC4xIiwic2VyaWFsIjoiQzE5'
+                'MDMxNDU1OTEhQCMkJV4mKigpXCInOjsnJ1xcPy9gflwiLCIsInR5cGUiOiJz'
+                'ZXJ2b192NCIsInN5c2ZzX3Byb2R1Y3QiOiJTZXJ2byBWNCJ9LCJjaGlsZHJl'
+                'biI6W3sidXNiX2h1Yl9wb3J0IjoiNi40LjIiLCJzZXJpYWwiOiIwNjgxRDAz'
+                'QS05MkRDQ0Q2NCIsInR5cGUiOiJjY2RfY3I1MCIsInN5c2ZzX3Byb2R1Y3Qi'
+                'OiJDcjUwIn1dfQ==')
+        data = {
+                "main": {
+                        "usb_hub_port": "6.4.1",
+                        "serial": "C1903145591!@#$%^&*()\"':;''\\?/`~\",",
+                        "type": "servo_v4",
+                        "sysfs_product": "Servo V4"
+                },
+                "children": [{
+                        "usb_hub_port": "6.4.2",
+                        "serial": "0681D03A-92DCCD64",
+                        "type": "ccd_cr50",
+                        "sysfs_product": "Cr50"
+                }]
+        }
+
+        result = servo_topology._parse_string_as_topology(b64_data)
+        self.assertEqual(result, data)
+
+    def test_expect_data_when_string_is_provided2(self):
+        b64_data = (
+                'eyJtYWluIjp7InVzYl9odWJfcG9ydCI6IiIsInNlcmlhbCI6IiIsInR5cGU'
+                'iOiIiLCJzeXNmc19wcm9kdWN0IjoiIn0sImNoaWxkcmVuIjpbXX0=')
+        data = {
+                "main": {
+                        "usb_hub_port": "",
+                        "serial": "",
+                        "type": "",
+                        "sysfs_product": ""
+                },
+                "children": []
+        }
+
+        result = servo_topology._parse_string_as_topology(b64_data)
+        self.assertEqual(result, data)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/server/cros/servo/topology/topology_constants.py b/server/cros/servo/topology/topology_constants.py
new file mode 100644
index 0000000..6702a5e
--- /dev/null
+++ b/server/cros/servo/topology/topology_constants.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python2
+# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+# Constants that will be used as key name in device health profile.
+SERVO_TOPOLOGY_LABEL_PREFIX = 'servo_topology'
+ST_DEVICE_MAIN = 'main'
+ST_DEVICE_CHILDREN = 'children'
+
+# Fields of servo topology item
+ST_DEVICE_SERIAL = 'serial'
+ST_DEVICE_TYPE = 'type'
+ST_DEVICE_PRODUCT = 'sysfs_product'
+ST_DEVICE_HUB_PORT = 'usb_hub_port'
+
+ST_CR50_TYPE = 'ccd_cr50'
+
+# Mapping between product names and types.
+ST_PRODUCT_TYPES = {
+        'Servo V4': 'servo_v4',
+        'Servo V4p1': 'servo_v4p1',
+        'Cr50': ST_CR50_TYPE,
+        'Servo Micro': 'servo_micro',
+        'C2D2': 'c2d2',
+        'Sweetberry': 'sweetberry'
+}
+
+# List unchangeable fields per device.
+SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS = (
+        ST_DEVICE_SERIAL,
+        ST_DEVICE_TYPE,
+        ST_DEVICE_PRODUCT,
+)