| # Copyright 2021 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Utilities to run DUT Control commands, get values from Servo.""" |
| |
| import logging |
| import time |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib.firmware import servo_lib |
| |
| |
| class Error(Exception): |
| """Base error class for the module.""" |
| |
| |
| class InvalidServoVersionError(Error): |
| """Invalid servo version error.""" |
| |
| |
| class DutConnectionError(Error): |
| """Error when fetching data from a dut.""" |
| |
| |
| class DutControl(): |
| """Wrapper for dut_control calls.""" |
| |
| def __init__(self, port): |
| self._base_cmd = ['dut-control'] |
| if port: |
| self._base_cmd.append('--port=%s' % port) |
| |
| def get_servo(self) -> servo_lib.Servo: |
| """Get the Servo instance the given dut_control command is using.""" |
| servo_type = self.get_value('servo_type') |
| |
| if '_and_' in servo_type: |
| # If servod is working with multiple interfaces then servo_type will be |
| # along the lines of "servo_v4p1_with_servo_micro_and_ccd_cr50". |
| # We need to pick an interface, so grab everything before "_and_". |
| first_servo_type = servo_type.split('_and_')[0] |
| logging.warning('Dual-mode servo detected. Treating %s as %s', |
| servo_type, first_servo_type) |
| servo_type = first_servo_type |
| |
| if servo_type not in servo_lib.VALID_SERVOS: |
| raise InvalidServoVersionError('Unrecognized servo version: %s' % |
| servo_type) |
| |
| option = servo_lib.get_serial_option(servo_type) |
| serial = self.get_value(option) |
| return servo_lib.Servo(servo_type, serial) |
| |
| def get_value(self, arg): |
| """Get the value of |arg| from dut_control.""" |
| try: |
| result = cros_build_lib.run( |
| self._base_cmd + [arg], stdout=True, encoding='utf-8') |
| except cros_build_lib.CalledProcessError as e: |
| logging.debug('dut-control error: %s', str(e)) |
| raise DutConnectionError( |
| 'Could not establish servo connection. Verify servod is running in ' |
| 'the background, and the servo is properly connected.') |
| |
| # Return value from the "key:value" output. |
| return result.stdout.partition(':')[2].strip() |
| |
| def run(self, cmd_fragment, verbose=False, dryrun=False): |
| """Run a dut_control command. |
| |
| Args: |
| cmd_fragment (list[str]): The dut_control command to run. |
| verbose (bool): Whether to print the command before it's run. |
| dryrun (bool): Whether to actually execute the command or just print it. |
| """ |
| cros_build_lib.run( |
| self._base_cmd + cmd_fragment, print_cmd=verbose, dryrun=dryrun) |
| |
| def run_all(self, cmd_fragments, verbose=False, dryrun=False): |
| """Run multiple dut_control commands in the order given. |
| |
| Args: |
| cmd_fragments (list[list[str]]): The dut_control commands to run. |
| verbose (bool): Whether to print the commands as they are run. |
| dryrun (bool): Whether to actually execute the command or just print it. |
| """ |
| for cmd in cmd_fragments: |
| self.run(cmd, verbose=verbose, dryrun=dryrun) |
| |
| def servo_run(self, dut_cmd_on, dut_cmd_off, flash_cmd, verbose, dryrun): |
| """Runs subprocesses for setting dut controls and executing flash_cmd. |
| |
| Args: |
| dut_cmd_on ([[str]]): 2d array of dut-control commands |
| in the form [['dut-control', 'cmd1', 'cmd2'...], |
| ['dut-control', 'cmd3'...]] |
| that get executed before the dut_cmd. |
| dut_cmd_off ([[str]]): 2d array of dut-control commands |
| in the same form that get executed after the dut_cmd. |
| flash_cmd ([str]): array containing all arguments for |
| the actual command. Run as root user on host. |
| verbose (bool): if True then print out the various |
| commands before running them. |
| dryrun (bool): if True then print the commands without executing. |
| |
| Returns: |
| bool: True if commands were run successfully, otherwise False. |
| """ |
| success = True |
| try: |
| # Dut on command runs. |
| self.run_all(dut_cmd_on, verbose=verbose, dryrun=dryrun) |
| |
| # Need to wait for SPI chip power to stabilize (for some designs) |
| time.sleep(1) |
| |
| # Run the flash command. |
| cros_build_lib.sudo_run(flash_cmd, print_cmd=verbose, dryrun=dryrun) |
| except cros_build_lib.CalledProcessError: |
| logging.error('DUT command failed, see output above for more info.') |
| success = False |
| finally: |
| # Run the dut off commands to clean up state if possible. |
| try: |
| self.run_all(dut_cmd_off, verbose=verbose, dryrun=dryrun) |
| except cros_build_lib.CalledProcessError: |
| logging.error('DUT cmd off failed, see output above for more info.') |
| success = False |
| |
| return success |