blob: 6cb5a15a4e8a69f8f4a0111eaea79786d89ebd34 [file] [log] [blame] [edit]
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities to run DUT Control commands, get values from Servo."""
import logging
import time
from chromite.lib import cros_build_lib
from chromite.lib.firmware import servo_lib
class Error(Exception):
"""Base error class for the module."""
class InvalidServoVersionError(Error):
"""Invalid servo version error."""
class DutConnectionError(Error):
"""Error when fetching data from a dut."""
class DutControl():
"""Wrapper for dut_control calls."""
def __init__(self, port):
self._base_cmd = ['dut-control']
if port:
self._base_cmd.append('--port=%s' % port)
def get_servo(self) -> servo_lib.Servo:
"""Get the Servo instance the given dut_control command is using."""
servo_type = self.get_value('servo_type')
if '_and_' in servo_type:
# If servod is working with multiple interfaces then servo_type will be
# along the lines of "servo_v4p1_with_servo_micro_and_ccd_cr50".
# We need to pick an interface, so grab everything before "_and_".
first_servo_type = servo_type.split('_and_')[0]
logging.warning('Dual-mode servo detected. Treating %s as %s',
servo_type, first_servo_type)
servo_type = first_servo_type
if servo_type not in servo_lib.VALID_SERVOS:
raise InvalidServoVersionError('Unrecognized servo version: %s' %
servo_type)
option = servo_lib.get_serial_option(servo_type)
serial = self.get_value(option)
return servo_lib.Servo(servo_type, serial)
def get_value(self, arg):
"""Get the value of |arg| from dut_control."""
try:
result = cros_build_lib.run(
self._base_cmd + [arg], stdout=True, encoding='utf-8')
except cros_build_lib.CalledProcessError as e:
logging.debug('dut-control error: %s', str(e))
raise DutConnectionError(
'Could not establish servo connection. Verify servod is running in '
'the background, and the servo is properly connected.')
# Return value from the "key:value" output.
return result.stdout.partition(':')[2].strip()
def run(self, cmd_fragment, verbose=False, dryrun=False):
"""Run a dut_control command.
Args:
cmd_fragment (list[str]): The dut_control command to run.
verbose (bool): Whether to print the command before it's run.
dryrun (bool): Whether to actually execute the command or just print it.
"""
cros_build_lib.run(
self._base_cmd + cmd_fragment, print_cmd=verbose, dryrun=dryrun)
def run_all(self, cmd_fragments, verbose=False, dryrun=False):
"""Run multiple dut_control commands in the order given.
Args:
cmd_fragments (list[list[str]]): The dut_control commands to run.
verbose (bool): Whether to print the commands as they are run.
dryrun (bool): Whether to actually execute the command or just print it.
"""
for cmd in cmd_fragments:
self.run(cmd, verbose=verbose, dryrun=dryrun)
def servo_run(self, dut_cmd_on, dut_cmd_off, flash_cmd, verbose, dryrun):
"""Runs subprocesses for setting dut controls and executing flash_cmd.
Args:
dut_cmd_on ([[str]]): 2d array of dut-control commands
in the form [['dut-control', 'cmd1', 'cmd2'...],
['dut-control', 'cmd3'...]]
that get executed before the dut_cmd.
dut_cmd_off ([[str]]): 2d array of dut-control commands
in the same form that get executed after the dut_cmd.
flash_cmd ([str]): array containing all arguments for
the actual command. Run as root user on host.
verbose (bool): if True then print out the various
commands before running them.
dryrun (bool): if True then print the commands without executing.
Returns:
bool: True if commands were run successfully, otherwise False.
"""
success = True
try:
# Dut on command runs.
self.run_all(dut_cmd_on, verbose=verbose, dryrun=dryrun)
# Need to wait for SPI chip power to stabilize (for some designs)
time.sleep(1)
# Run the flash command.
cros_build_lib.sudo_run(flash_cmd, print_cmd=verbose, dryrun=dryrun)
except cros_build_lib.CalledProcessError:
logging.error('DUT command failed, see output above for more info.')
success = False
finally:
# Run the dut off commands to clean up state if possible.
try:
self.run_all(dut_cmd_off, verbose=verbose, dryrun=dryrun)
except cros_build_lib.CalledProcessError:
logging.error('DUT cmd off failed, see output above for more info.')
success = False
return success