blob: 56d66fc8ba4831cc45605390e22d46800656fd7c [file] [log] [blame]
#!/usr/bin/env python2
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import division
import os
import copy
import json
import base64
import logging
import common
from autotest_lib.client.common_lib import hosts
from autotest_lib.server.cros.servo.topology import topology_constants as stc
class ServoTopologyError(Exception):
"""
Generic Exception for failures from ServoTopology object.
"""
pass
class MissingServoError(ServoTopologyError):
"""
Exception to throw when child servo type is missing.
"""
def __init__(self, message, servo_type):
self._servo_type = servo_type
self.message = message
def __str__(self):
return repr(self.message)
class ServoTopology(object):
"""Class to read, generate and validate servo topology in the lab.
The class support detection of servo listed in ST_PRODUCT_TYPES.
To save servo topology to host-info date passed two steps:
- convert to the json
- encode to base64
"""
def __init__(self, servo_host):
self._host = servo_host
self._topology = None
def read(self, host_info):
"""Reading servo-topology info."""
logging.info('Reading servo topology info...')
self._topology = None
if not host_info:
logging.info('The host_info not provided. Skip reading.')
return
b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
self._topology = _parse_string_as_topology(b64_val)
logging.debug('Loaded servo topology: %s', self._topology)
if self._topology:
logging.info('Servo topology loaded successfully.')
def save(self, host_info_store):
"""Saving servo-topology info."""
if self.is_empty():
logging.info('Topology is empty. Skip saving.')
return
if not host_info_store:
logging.info('The host_info_store not provided. Skip saving.')
return
logging.info('Saving servo topology info...')
data = _convert_topology_to_string(self._topology)
if not data:
logging.info('Servo topology fail to save data.'
' Please file a bug.')
return
host_info = host_info_store.get()
prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
if prev_value and prev_value == data:
logging.info('Servo topology was not changed. Skip saving.')
return
logging.debug('Previous saved topology: %s', prev_value)
host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data)
host_info_store.commit(host_info)
logging.info('Servo topology saved successfully.')
def generate(self):
"""Read servo data and create topology."""
try:
self._topology = self._generate()
except Exception as e:
self._topology = None
logging.debug('(Not critical) %s', e)
logging.info('Fail to generate servo-topology')
if not self.is_empty():
logging.info('Servo topology successfully generated.')
def is_empty(self):
"""If topology data was initialized."""
return not bool(self._topology)
def validate(self, raise_error=False, dual_set=False, compare=False):
"""Validate topology against expected topology.
Validation against:
- set-up expectation: min one child or 2 for DUAL_V4
- last saved topology: check if any device missed
@params raise_error: raise error if validate did not pass otherwise
return False.
@params dual_set: Check if servo expect DUAL_V4 setup.
@params compare: Validate against saved topology.
"""
new_st = self._generate()
if not new_st or not new_st.get(stc.ST_DEVICE_MAIN):
message = 'Main device is not detected'
return self._process_error(message, raise_error)
children = new_st.get(stc.ST_DEVICE_CHILDREN)
# basic setup has to have minimum one child.
if not children or len(children) < 1:
message = 'Each setup has at least one child'
return self._process_error(message, raise_error)
children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children]
# DUAL_V4 setup has to have cr50 and one more child.
if dual_set:
if stc.ST_CR50_TYPE not in children_types:
return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error)
if len(children) < 2:
message = 'Expected two children but have only one'
return self._process_error(message, raise_error)
if compare and not self.is_empty():
main_device = new_st.get(stc.ST_DEVICE_MAIN)
t = self._topology
old_main = t.get(stc.ST_DEVICE_MAIN)
old_children = t.get(stc.ST_DEVICE_CHILDREN)
if not old_main or not old_main.get(stc.ST_DEVICE_HUB_PORT):
# Old data is invalid for comparasing
return True
if not self._equal_item(old_main, main_device):
message = 'Main servo was changed'
return self._process_error(message, raise_error)
for child in old_children:
old_type = child.get(stc.ST_DEVICE_TYPE)
if old_type not in children_types:
return self._missing_servo_error(old_type, raise_error)
if len(children) < len(old_children):
message = 'Some child is missed'
return self._process_error(message, raise_error)
logging.info('Servo topology successfully verified.')
return True
def _process_error(self, message, raise_error):
if not raise_error:
logging.info('Validate servo topology failed with: %s', message)
return False
raise ServoTopologyError(message)
def _missing_servo_error(self, servo_type, raise_error):
message = 'Missed servo: %s!' % servo_type
if not raise_error:
logging.info('Validate servo topology failed with: %s', message)
return False
raise MissingServoError(message, servo_type)
def _equal_item(self, old, new):
"""Servo was replugged to another port"""
for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS:
if old.get(field) != new.get(field):
return False
return True
def _generate(self):
"""Generate and return topology structure.
Read and generate topology structure with out update the state.
"""
logging.debug('Trying generate a servo-topology')
core_servo_serial = self._host.servo_serial
if not core_servo_serial:
logging.info('Servo serial is not provided.')
return None
logging.debug('Getting topology for core servo: %s', core_servo_serial)
# collect main device info
cmd_hub = 'servodtool device -s %s usb-path' % core_servo_serial
servo_path = self._read_line(cmd_hub)
logging.debug('Device -%s path: %s', core_servo_serial, servo_path)
if not servo_path:
logging.info('Core servo not detected.')
return None
if not self._is_expected_type(servo_path):
return None
main_device = self._read_device_info(servo_path)
if not main_device:
logging.debug('Core device missed some data')
return None
# collect child device info
children = []
hub_path = servo_path[0:-2]
logging.debug('Core hub path: %s', hub_path)
devices_cmd = 'find %s/* -name serial' % hub_path
devices = self._read_multilines(devices_cmd)
core_device_port = main_device.get(stc.ST_DEVICE_HUB_PORT)
for device in devices:
logging.debug('Child device %s', device)
device_dir = os.path.dirname(device)
if not self._is_expected_type(device_dir):
# skip not expected device type like USB or hubs
continue
child = self._read_device_info(device_dir)
if not child:
logging.debug('Child missed some data.')
continue
if core_device_port == child.get(stc.ST_DEVICE_HUB_PORT):
logging.debug('Skip device if match with core device')
continue
children.append(child)
topology = {
stc.ST_DEVICE_MAIN: main_device,
stc.ST_DEVICE_CHILDREN: children
}
logging.debug('Servo topology: %s', topology)
return topology
def _is_expected_type(self, path):
"""Check if device type is known servo type.
Please update ST_PRODUCT_TYPES to extend more servo types.
"""
product = self._read_file(path, 'product')
if bool(stc.ST_PRODUCT_TYPES.get(product)):
return True
logging.info('Unknown product: %s', product)
return False
def _read_device_info(self, path):
"""Read device details for topology.
@params path: Absolute path to the device in FS.
"""
serial = self._read_file(path, 'serial')
product = self._read_file(path, 'product')
hub_path = self._read_file(path, 'devpath')
stype = stc.ST_PRODUCT_TYPES.get(product)
return self._create_item(serial, stype, product, hub_path)
def _create_item(self, servo_serial, servo_type, product, hub_path):
"""Create topology item.
Return created item only if all details provided.
@params servo_serial: Serial number of device.
@params servo_type: Product type code of the device.
@params product: Product name of the device.
@params hub_path: Device enumerated folder name. Show the
chain of used ports to connect the device.
"""
item = {
stc.ST_DEVICE_SERIAL: servo_serial,
stc.ST_DEVICE_TYPE: servo_type,
stc.ST_DEVICE_PRODUCT: product,
stc.ST_DEVICE_HUB_PORT: hub_path
}
if not (servo_serial and servo_type and product and hub_path):
logging.debug('Some data missing: %s', item)
return None
return item
def _read_file(self, path, file_name):
"""Read context of the file and return result as one line.
If execution finished with error result will be empty string.
@params path: Path to the folder where file located.
@params file_name: The file name to read.
"""
if not path or not file_name:
return ''
f = os.path.join(path, file_name)
return self._read_line('cat %s' % f)
def _read_line(self, command):
"""Execute terminal command and return result as one line.
If execution finished with error result will be empty string.
@params command: String to execute.
"""
r = self._host.run(command, ignore_status=True, timeout=30)
if r.exit_status == 0:
return r.stdout.strip()
return ''
def _read_multilines(self, command):
"""Execute terminal command and return result as multi-line.
If execution finished with error result will be an empty array.
@params command: String to execute.
"""
r = self._host.run(command, ignore_status=True, timeout=30)
if r.exit_status == 0:
return r.stdout.splitlines()
return []
def _convert_topology_to_string(topology):
"""Convert topology to the string respresentation.
Convert topology to json and encode by Base64 for host-info file.
@params topology: Servo topology data
@returns: topology representation in Base64 string
"""
if not topology:
return ''
try:
# generate json similar to golang to avoid extra updates
json_string = json.dumps(topology, separators=(',', ':'))
logging.debug('Servo topology (json): %s', json_string)
except Exception as e:
logging.debug('(Not critical) %s', e)
logging.info('Failed to convert topology to json')
return ''
try:
# recommended to convert to the bytes for python 3
b64_string = base64.b64encode(json_string.encode("utf-8"))
logging.debug('Servo topology (b64): %s', b64_string)
return b64_string
except Exception as e:
logging.debug('(Not critical) %s', e)
logging.info('Failed to convert topology to base64')
return ''
def _parse_string_as_topology(src):
"""Parse and load servo topology from string.
Decode Base64 and load as json of servo-topology data.
@params src: topology representation in Base64 string
@returns: servo topology data
"""
if not src:
logging.debug('Servo topology data not present in host-info.')
return None
try:
json_string = base64.b64decode(src)
logging.debug('Servo topology (json) from host-info: %s', json_string)
return json.loads(json_string)
except Exception as e:
logging.debug('(Not critical) %s', e)
logging.info('Fail to read servo-topology from host-info.')
return None